2042 lines
69 KiB
Markdown
2042 lines
69 KiB
Markdown
|
|
# MySQL Question Answering Implementation Plan
|
|||
|
|
|
|||
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|||
|
|
|
|||
|
|
**Goal:** Build a MySQL-only, read-only intelligent data questioning backend path for Neta Agents, with safe data-source configuration, schema/sample/query tools, cross-table JOIN support, audit, and a prompt skill.
|
|||
|
|
|
|||
|
|
**Architecture:** Add a `mysql` toolset backed by `netaclaw_data_source`, `SecretCryptoService`, `MysqlPoolManager`, schema/query services, and guarded Tool implementations. Security-critical rules live in services and tools; `data-analyst-mysql` only guides Agent behavior.
|
|||
|
|
|
|||
|
|
**Tech Stack:** Midway.js, TypeORM, mysql2/promise, TypeBox tool schemas, Jest, NetaClaw Tool Catalog/Resolver/Runtime Policy, prompt Skill.
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## File Structure
|
|||
|
|
|
|||
|
|
Create:
|
|||
|
|
|
|||
|
|
- `packages/backend/src/modules/netaclaw/entity/data_source.ts` — MySQL data-source configuration entity.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts` — query audit entity.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/service/secret_crypto.ts` — shared AES-256-GCM helper for new encrypted secrets.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/service/data_source.ts` — admin/runtime data-source lookup, authorization, save, and test connection service.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/service/mysql_pool.ts` — cached mysql2 pool manager.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/service/mysql_schema.ts` — information_schema mapping and schema/sample helpers.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/service/mysql_query.ts` — read-only SQL execution, masked-column rejection, row limits, audit.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts` — `mysql_list_sources`, `mysql_schema`, `mysql_table_sample`, `mysql_query`.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts` — admin APIs for data-source list/save/test/delete.
|
|||
|
|
- `packages/backend/skills/data-analyst-mysql/SKILL.md` — prompt skill for MySQL data analysis.
|
|||
|
|
- `packages/backend/test/secret_crypto.test.ts`
|
|||
|
|
- `packages/backend/test/mysql_sql_guard.test.ts`
|
|||
|
|
- `packages/backend/test/mysql_schema_mapper.test.ts`
|
|||
|
|
- `packages/backend/test/mysql_query_service.test.ts`
|
|||
|
|
- `packages/backend/test/mysql_pool_manager.test.ts`
|
|||
|
|
- `packages/backend/test/data_source_projection.test.ts`
|
|||
|
|
|
|||
|
|
Modify:
|
|||
|
|
|
|||
|
|
- `packages/backend/src/entities.ts` — add data source and query audit entity imports to the generated entity list. This file is marked generated, but existing tests assert it directly, so update it for this change.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/tools/catalog.ts` — add `TOOLSET_MYSQL` and import `./builtin/mysql.js`.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/tools/manifest.ts` — route `mysql_*` tools as main-process proxy, classify as custom/text/parallel.
|
|||
|
|
- `packages/backend/src/modules/netaclaw/service/tool_resolver.ts` — inject MySQL tool dependencies into runtime tool map.
|
|||
|
|
- `packages/backend/test/entity_exports.test.ts` — assert new entities are exported.
|
|||
|
|
- `packages/backend/test/tool_resolver.test.ts` — assert mysql toolset resolution and subagent route.
|
|||
|
|
|
|||
|
|
Do not implement frontend UI in this pass. The admin controller is enough for Phase 1/2 backend configuration.
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Task 1: Entities And Secret Crypto
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/entity/data_source.ts`
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts`
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/service/secret_crypto.ts`
|
|||
|
|
- Modify: `packages/backend/src/entities.ts`
|
|||
|
|
- Modify: `packages/backend/test/entity_exports.test.ts`
|
|||
|
|
- Test: `packages/backend/test/secret_crypto.test.ts`
|
|||
|
|
- Test: `packages/backend/test/entity_exports.test.ts`
|
|||
|
|
- Test: `packages/backend/test/data_source_projection.test.ts`
|
|||
|
|
|
|||
|
|
- [ ] **Step 1: Write the failing secret crypto test**
|
|||
|
|
|
|||
|
|
Add `packages/backend/test/secret_crypto.test.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { SecretCryptoService } from '../src/modules/netaclaw/service/secret_crypto.js';
|
|||
|
|
|
|||
|
|
describe('SecretCryptoService', () => {
|
|||
|
|
const originalNetaSecret = process.env.NETA_SECRET_KEY;
|
|||
|
|
const originalAppSecret = process.env.APP_SECRET;
|
|||
|
|
|
|||
|
|
beforeEach(() => {
|
|||
|
|
process.env.NETA_SECRET_KEY = 'unit-test-secret-key';
|
|||
|
|
delete process.env.APP_SECRET;
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
afterEach(() => {
|
|||
|
|
if (originalNetaSecret === undefined) delete process.env.NETA_SECRET_KEY;
|
|||
|
|
else process.env.NETA_SECRET_KEY = originalNetaSecret;
|
|||
|
|
if (originalAppSecret === undefined) delete process.env.APP_SECRET;
|
|||
|
|
else process.env.APP_SECRET = originalAppSecret;
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('encrypts text without embedding the plaintext', () => {
|
|||
|
|
const service = new SecretCryptoService();
|
|||
|
|
|
|||
|
|
const encrypted = service.encryptText('mysql-password-123');
|
|||
|
|
const envelope = JSON.parse(Buffer.from(encrypted, 'base64').toString('utf8'));
|
|||
|
|
|
|||
|
|
expect(encrypted).not.toContain('mysql-password-123');
|
|||
|
|
expect(envelope).toEqual(expect.objectContaining({ v: 1, alg: 'aes-256-gcm' }));
|
|||
|
|
expect(envelope.iv).toEqual(expect.any(String));
|
|||
|
|
expect(envelope.tag).toEqual(expect.any(String));
|
|||
|
|
expect(envelope.ct).toEqual(expect.any(String));
|
|||
|
|
expect(service.decryptText(encrypted)).toBe('mysql-password-123');
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('encrypts and decrypts JSON values', () => {
|
|||
|
|
const service = new SecretCryptoService();
|
|||
|
|
|
|||
|
|
const encrypted = service.encryptJson({ password: 'secret', token: 'abc' });
|
|||
|
|
|
|||
|
|
expect(encrypted).not.toContain('secret');
|
|||
|
|
expect(service.decryptJson(encrypted)).toEqual({ password: 'secret', token: 'abc' });
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects tampered ciphertext', () => {
|
|||
|
|
const service = new SecretCryptoService();
|
|||
|
|
const encrypted = service.encryptText('secret');
|
|||
|
|
const tampered = encrypted.slice(0, -2) + 'xx';
|
|||
|
|
|
|||
|
|
expect(() => service.decryptText(tampered)).toThrow();
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 2: Run test to verify RED**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: FAIL because `secret_crypto.ts` does not exist.
|
|||
|
|
|
|||
|
|
- [ ] **Step 3: Implement `SecretCryptoService`**
|
|||
|
|
|
|||
|
|
Create `packages/backend/src/modules/netaclaw/service/secret_crypto.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { Provide, Scope, ScopeEnum } from '@midwayjs/core';
|
|||
|
|
import * as crypto from 'crypto';
|
|||
|
|
|
|||
|
|
@Provide()
|
|||
|
|
@Scope(ScopeEnum.Singleton)
|
|||
|
|
export class SecretCryptoService {
|
|||
|
|
private readonly algorithm = 'aes-256-gcm' as const;
|
|||
|
|
private readonly ivLength = 12;
|
|||
|
|
|
|||
|
|
private deriveKey(): Buffer {
|
|||
|
|
const raw = process.env.NETA_SECRET_KEY || process.env.APP_SECRET;
|
|||
|
|
if (!raw) {
|
|||
|
|
throw new Error('NETA_SECRET_KEY or APP_SECRET is required for secret encryption');
|
|||
|
|
}
|
|||
|
|
return crypto.createHash('sha256').update(raw).digest();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
encryptText(value: string): string {
|
|||
|
|
const iv = crypto.randomBytes(this.ivLength);
|
|||
|
|
const cipher = crypto.createCipheriv(this.algorithm, this.deriveKey(), iv);
|
|||
|
|
const encrypted = Buffer.concat([cipher.update(value, 'utf8'), cipher.final()]);
|
|||
|
|
const tag = cipher.getAuthTag();
|
|||
|
|
return Buffer.from(JSON.stringify({
|
|||
|
|
v: 1,
|
|||
|
|
alg: this.algorithm,
|
|||
|
|
iv: iv.toString('base64'),
|
|||
|
|
tag: tag.toString('base64'),
|
|||
|
|
ct: encrypted.toString('base64'),
|
|||
|
|
}), 'utf8').toString('base64');
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
decryptText(ciphertext: string): string {
|
|||
|
|
const envelope = JSON.parse(Buffer.from(ciphertext, 'base64').toString('utf8'));
|
|||
|
|
if (!envelope || envelope.v !== 1 || envelope.alg !== this.algorithm) {
|
|||
|
|
throw new Error('Unsupported ciphertext envelope');
|
|||
|
|
}
|
|||
|
|
const iv = Buffer.from(envelope.iv, 'base64');
|
|||
|
|
const tag = Buffer.from(envelope.tag, 'base64');
|
|||
|
|
const encrypted = Buffer.from(envelope.ct, 'base64');
|
|||
|
|
const decipher = crypto.createDecipheriv(this.algorithm, this.deriveKey(), iv);
|
|||
|
|
decipher.setAuthTag(tag);
|
|||
|
|
return Buffer.concat([decipher.update(encrypted), decipher.final()]).toString('utf8');
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
encryptJson(value: Record<string, string>): string {
|
|||
|
|
return this.encryptText(JSON.stringify(value));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
decryptJson(ciphertext: string): Record<string, string> {
|
|||
|
|
const parsed = JSON.parse(this.decryptText(ciphertext));
|
|||
|
|
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
|
|||
|
|
throw new Error('Invalid encrypted JSON payload');
|
|||
|
|
}
|
|||
|
|
return Object.fromEntries(Object.entries(parsed).map(([key, value]) => [key, String(value)]));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 4: Run test to verify GREEN**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 5: Write failing entity export test**
|
|||
|
|
|
|||
|
|
Modify `packages/backend/test/entity_exports.test.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { entities } from '../src/entities.js';
|
|||
|
|
import { NetaClawAgentSessionEntity } from '../src/modules/netaclaw/entity/agent_session.js';
|
|||
|
|
import { NetaClawAgentSessionEntryEntity } from '../src/modules/netaclaw/entity/agent_session_entry.js';
|
|||
|
|
import { NetaClawDataSourceEntity } from '../src/modules/netaclaw/entity/data_source.js';
|
|||
|
|
import { NetaClawDataSourceQueryAuditEntity } from '../src/modules/netaclaw/entity/data_source_query_audit.js';
|
|||
|
|
|
|||
|
|
describe('entities exports', () => {
|
|||
|
|
it('includes SubagentSession entity', () => {
|
|||
|
|
const entityNames = entities
|
|||
|
|
.map(e => (typeof e === 'function' ? e.name : ''))
|
|||
|
|
.filter(Boolean);
|
|||
|
|
|
|||
|
|
expect(entityNames.some(n => n.includes('SubagentSession'))).toBe(true);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('includes session-tree MySQL entities', () => {
|
|||
|
|
expect(entities).toContain(NetaClawAgentSessionEntity);
|
|||
|
|
expect(entities).toContain(NetaClawAgentSessionEntryEntity);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('includes MySQL question answering entities', () => {
|
|||
|
|
expect(entities).toContain(NetaClawDataSourceEntity);
|
|||
|
|
expect(entities).toContain(NetaClawDataSourceQueryAuditEntity);
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 6: Run test to verify RED**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/entity_exports.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: FAIL because entity files do not exist.
|
|||
|
|
|
|||
|
|
- [ ] **Step 7: Implement data source entities**
|
|||
|
|
|
|||
|
|
Create `packages/backend/src/modules/netaclaw/entity/data_source.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { BaseEntity } from '../../base/entity/base.js';
|
|||
|
|
import { Column, Entity, Index } from 'typeorm';
|
|||
|
|
|
|||
|
|
export interface NetaClawDataSourceExtra {
|
|||
|
|
ssl?: boolean | Record<string, unknown>;
|
|||
|
|
connectTimeout?: number;
|
|||
|
|
queryTimeoutMs?: number;
|
|||
|
|
maxRows?: number;
|
|||
|
|
allowedTables?: string[];
|
|||
|
|
blockedTables?: string[];
|
|||
|
|
maskedColumns?: Record<string, 'hash' | 'partial' | 'redact'>;
|
|||
|
|
schemaVisibility?: 'allowed-only' | 'all-names-only';
|
|||
|
|
maxJoinTables?: number;
|
|||
|
|
poolConnectionLimit?: number;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@Entity('netaclaw_data_source')
|
|||
|
|
export class NetaClawDataSourceEntity extends BaseEntity {
|
|||
|
|
@Index({ unique: true })
|
|||
|
|
@Column({ comment: '数据源唯一名', length: 100 })
|
|||
|
|
name: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: '显示名称', length: 200, nullable: true })
|
|||
|
|
label: string;
|
|||
|
|
|
|||
|
|
@Index()
|
|||
|
|
@Column({ comment: '数据源类型', length: 20, default: 'mysql' })
|
|||
|
|
type: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: 'MySQL host', length: 255 })
|
|||
|
|
host: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: 'MySQL port', default: 3306 })
|
|||
|
|
port: number;
|
|||
|
|
|
|||
|
|
@Column({ comment: '默认数据库', length: 128 })
|
|||
|
|
database: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: '连接用户名', length: 255 })
|
|||
|
|
username: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: '加密后的连接密码', type: 'text', nullable: true })
|
|||
|
|
passwordEncrypted: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: '是否只读', default: 1 })
|
|||
|
|
readonly: number;
|
|||
|
|
|
|||
|
|
@Index()
|
|||
|
|
@Column({ comment: '状态 0=禁用 1=启用', default: 1 })
|
|||
|
|
status: number;
|
|||
|
|
|
|||
|
|
@Column({ comment: '授权 Agent ID 列表', type: 'json', nullable: true })
|
|||
|
|
allowedAgentIds: number[];
|
|||
|
|
|
|||
|
|
@Column({ comment: '扩展配置', type: 'json', nullable: true })
|
|||
|
|
extra: NetaClawDataSourceExtra;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Create `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { BaseEntity } from '../../base/entity/base.js';
|
|||
|
|
import { Column, Entity, Index } from 'typeorm';
|
|||
|
|
|
|||
|
|
@Entity('netaclaw_data_source_query_audit')
|
|||
|
|
export class NetaClawDataSourceQueryAuditEntity extends BaseEntity {
|
|||
|
|
@Index()
|
|||
|
|
@Column({ comment: '数据源 ID' })
|
|||
|
|
dataSourceId: number;
|
|||
|
|
|
|||
|
|
@Index()
|
|||
|
|
@Column({ comment: 'Agent ID', nullable: true })
|
|||
|
|
agentId: number;
|
|||
|
|
|
|||
|
|
@Index()
|
|||
|
|
@Column({ comment: '用户 ID', nullable: true })
|
|||
|
|
userId: number;
|
|||
|
|
|
|||
|
|
@Column({ comment: '工具调用 ID', length: 100, nullable: true })
|
|||
|
|
toolCallId: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: 'SQL SHA-256', length: 64 })
|
|||
|
|
sqlHash: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: 'SQL 摘要', type: 'text', nullable: true })
|
|||
|
|
sqlPreview: string;
|
|||
|
|
|
|||
|
|
@Index()
|
|||
|
|
@Column({ comment: '状态 success/rejected/failed', length: 20 })
|
|||
|
|
status: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: '拒绝原因', length: 100, nullable: true })
|
|||
|
|
rejectReason: string;
|
|||
|
|
|
|||
|
|
@Column({ comment: '耗时 ms', nullable: true })
|
|||
|
|
elapsedMs: number;
|
|||
|
|
|
|||
|
|
@Column({ comment: '返回行数', nullable: true })
|
|||
|
|
rowCount: number;
|
|||
|
|
|
|||
|
|
@Column({ comment: '错误代码', length: 50, nullable: true })
|
|||
|
|
errorCode: string;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Modify `packages/backend/src/entities.ts` by adding imports after existing netaclaw imports:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import * as entity51 from './modules/netaclaw/entity/data_source';
|
|||
|
|
import * as entity52 from './modules/netaclaw/entity/data_source_query_audit';
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
and append:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
...Object.values(entity51),
|
|||
|
|
...Object.values(entity52),
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 8: Run entity and crypto tests to verify GREEN**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/entity_exports.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 9: Write failing data source projection test**
|
|||
|
|
|
|||
|
|
Add `packages/backend/test/data_source_projection.test.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { NetaClawDataSourceService } from '../src/modules/netaclaw/service/data_source.js';
|
|||
|
|
|
|||
|
|
describe('NetaClawDataSourceService projections', () => {
|
|||
|
|
const source = {
|
|||
|
|
id: 1,
|
|||
|
|
name: 'sales_prod',
|
|||
|
|
label: '销售库',
|
|||
|
|
type: 'mysql',
|
|||
|
|
host: '127.0.0.1',
|
|||
|
|
port: 3306,
|
|||
|
|
database: 'sales',
|
|||
|
|
username: 'root',
|
|||
|
|
passwordEncrypted: 'ciphertext',
|
|||
|
|
readonly: 1,
|
|||
|
|
status: 1,
|
|||
|
|
allowedAgentIds: [9],
|
|||
|
|
extra: { allowedTables: ['orders'] },
|
|||
|
|
} as any;
|
|||
|
|
|
|||
|
|
it('admin projection hides passwords but keeps connection metadata', () => {
|
|||
|
|
const service = new NetaClawDataSourceService();
|
|||
|
|
|
|||
|
|
expect(service.toAdminSafe(source)).toEqual(expect.objectContaining({
|
|||
|
|
name: 'sales_prod',
|
|||
|
|
host: '127.0.0.1',
|
|||
|
|
username: 'root',
|
|||
|
|
hasPassword: true,
|
|||
|
|
}));
|
|||
|
|
expect(service.toAdminSafe(source)).not.toHaveProperty('passwordEncrypted');
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('agent projection hides host username password and permission details', () => {
|
|||
|
|
const service = new NetaClawDataSourceService();
|
|||
|
|
|
|||
|
|
expect(service.toAgentSummary(source)).toEqual({
|
|||
|
|
name: 'sales_prod',
|
|||
|
|
label: '销售库',
|
|||
|
|
database: 'sales',
|
|||
|
|
status: 1,
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 10: Run projection test to verify RED**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/data_source_projection.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: FAIL because `toAdminSafe` and `toAgentSummary` do not exist.
|
|||
|
|
|
|||
|
|
- [ ] **Step 11: Implement safe projections**
|
|||
|
|
|
|||
|
|
Modify the service code in this task:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
export type AdminSafeDataSource = Omit<NetaClawDataSourceEntity, 'passwordEncrypted'> & {
|
|||
|
|
hasPassword: boolean;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
export type AgentDataSourceSummary = Pick<NetaClawDataSourceEntity, 'name' | 'label' | 'database' | 'status'>;
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Replace any existing unsafe projection helper with:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
toAdminSafe(entity: NetaClawDataSourceEntity): AdminSafeDataSource {
|
|||
|
|
const { passwordEncrypted, ...rest } = entity as any;
|
|||
|
|
return { ...rest, hasPassword: !!passwordEncrypted };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
toAgentSummary(entity: NetaClawDataSourceEntity): AgentDataSourceSummary {
|
|||
|
|
return {
|
|||
|
|
name: entity.name,
|
|||
|
|
label: entity.label,
|
|||
|
|
database: entity.database,
|
|||
|
|
status: entity.status,
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Update `listAdminSafe()` and `listForAgent()` to return the right projections:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
async listAdminSafe(): Promise<AdminSafeDataSource[]> {
|
|||
|
|
const rows = await this.repo.find({ order: { id: 'DESC' } as any });
|
|||
|
|
return rows.map(row => this.toAdminSafe(row));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async listForAgent(agentId: number): Promise<AgentDataSourceSummary[]> {
|
|||
|
|
const rows = await this.repo.find({ where: { type: 'mysql', status: 1 } as any, order: { id: 'DESC' } as any });
|
|||
|
|
return rows
|
|||
|
|
.filter(row => Array.isArray(row.allowedAgentIds) && row.allowedAgentIds.includes(agentId))
|
|||
|
|
.map(row => this.toAgentSummary(row));
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Update `saveConfig()` to return `toAdminSafe(saved)`.
|
|||
|
|
|
|||
|
|
- [ ] **Step 12: Run tests to verify GREEN**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/entity_exports.test.ts test/data_source_projection.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 13: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add packages/backend/src/modules/netaclaw/entity/data_source.ts packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts packages/backend/src/modules/netaclaw/service/secret_crypto.ts packages/backend/src/entities.ts packages/backend/test/secret_crypto.test.ts packages/backend/test/entity_exports.test.ts packages/backend/test/data_source_projection.test.ts
|
|||
|
|
git commit -m "feat: add mysql data source entities and secret crypto"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Task 2: SQL Guard
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/service/mysql_query.ts`
|
|||
|
|
- Test: `packages/backend/test/mysql_sql_guard.test.ts`
|
|||
|
|
|
|||
|
|
- [ ] **Step 1: Write failing SQL guard tests**
|
|||
|
|
|
|||
|
|
Add `packages/backend/test/mysql_sql_guard.test.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { validateMysqlReadOnlySql } from '../src/modules/netaclaw/service/mysql_query.js';
|
|||
|
|
|
|||
|
|
describe('validateMysqlReadOnlySql', () => {
|
|||
|
|
const baseOptions = {
|
|||
|
|
allowedTables: ['orders', 'customers'],
|
|||
|
|
blockedTables: [],
|
|||
|
|
maxJoinTables: 4,
|
|||
|
|
maxRows: 200,
|
|||
|
|
maskedColumns: {},
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
it('accepts a SELECT with an explicit JOIN condition', () => {
|
|||
|
|
expect(validateMysqlReadOnlySql(
|
|||
|
|
'SELECT c.name, COUNT(*) FROM customers c JOIN orders o ON o.customer_id = c.id GROUP BY c.name LIMIT 20',
|
|||
|
|
baseOptions
|
|||
|
|
)).toEqual(expect.objectContaining({
|
|||
|
|
ok: true,
|
|||
|
|
tables: expect.arrayContaining(['customers', 'orders']),
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it.each([
|
|||
|
|
'SHOW TABLES',
|
|||
|
|
'DESCRIBE orders',
|
|||
|
|
'EXPLAIN SELECT * FROM orders',
|
|||
|
|
'UPDATE orders SET amount = 1',
|
|||
|
|
'DELETE FROM orders',
|
|||
|
|
'DROP TABLE orders',
|
|||
|
|
'SELECT * FROM orders; SELECT * FROM customers',
|
|||
|
|
'SELECT * FROM orders -- comment',
|
|||
|
|
'SELECT * FROM orders # comment',
|
|||
|
|
'SELECT * FROM orders /* comment */',
|
|||
|
|
'WITH recent AS (SELECT * FROM orders) SELECT * FROM recent',
|
|||
|
|
'SELECT * FROM orders UNION SELECT * FROM customers',
|
|||
|
|
'SELECT @x := id FROM orders',
|
|||
|
|
'SELECT SLEEP(1)',
|
|||
|
|
'SELECT * INTO OUTFILE "/tmp/a" FROM orders',
|
|||
|
|
])('rejects unsafe SQL: %s', sql => {
|
|||
|
|
const result = validateMysqlReadOnlySql(sql, baseOptions);
|
|||
|
|
expect(result.ok).toBe(false);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects unallowed tables', () => {
|
|||
|
|
const result = validateMysqlReadOnlySql('SELECT * FROM payments LIMIT 10', baseOptions);
|
|||
|
|
|
|||
|
|
expect(result).toEqual(expect.objectContaining({
|
|||
|
|
ok: false,
|
|||
|
|
reason: 'table_not_allowed',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects blocked tables even when allowed', () => {
|
|||
|
|
const result = validateMysqlReadOnlySql('SELECT * FROM orders LIMIT 10', {
|
|||
|
|
allowedTables: ['orders'],
|
|||
|
|
blockedTables: ['orders'],
|
|||
|
|
maxJoinTables: 4,
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
expect(result).toEqual(expect.objectContaining({
|
|||
|
|
ok: false,
|
|||
|
|
reason: 'table_blocked',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects explicit joins without ON or USING', () => {
|
|||
|
|
const result = validateMysqlReadOnlySql(
|
|||
|
|
'SELECT * FROM customers JOIN orders LIMIT 10',
|
|||
|
|
baseOptions
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
expect(result).toEqual(expect.objectContaining({
|
|||
|
|
ok: false,
|
|||
|
|
reason: 'join_condition_required',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects more joined tables than allowed', () => {
|
|||
|
|
const result = validateMysqlReadOnlySql(
|
|||
|
|
'SELECT * FROM a JOIN b ON b.a_id = a.id JOIN c ON c.b_id = b.id LIMIT 10',
|
|||
|
|
{ allowedTables: ['a', 'b', 'c'], blockedTables: [], maxJoinTables: 2, maxRows: 200, maskedColumns: {} }
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
expect(result).toEqual(expect.objectContaining({
|
|||
|
|
ok: false,
|
|||
|
|
reason: 'too_many_join_tables',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects a later JOIN that lacks its own ON or USING clause', () => {
|
|||
|
|
const result = validateMysqlReadOnlySql(
|
|||
|
|
'SELECT * FROM customers c JOIN orders o ON o.customer_id = c.id JOIN invoices i LIMIT 10',
|
|||
|
|
{ allowedTables: ['customers', 'orders', 'invoices'], blockedTables: [], maxJoinTables: 4, maxRows: 200, maskedColumns: {} }
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
expect(result).toEqual(expect.objectContaining({
|
|||
|
|
ok: false,
|
|||
|
|
reason: 'join_condition_required',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects explicit LIMIT values above maxRows', () => {
|
|||
|
|
const result = validateMysqlReadOnlySql('SELECT * FROM orders LIMIT 10000', baseOptions);
|
|||
|
|
|
|||
|
|
expect(result).toEqual(expect.objectContaining({
|
|||
|
|
ok: false,
|
|||
|
|
reason: 'limit_exceeds_max_rows',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects SQL that references masked columns', () => {
|
|||
|
|
const result = validateMysqlReadOnlySql('SELECT email AS e FROM customers LIMIT 10', {
|
|||
|
|
allowedTables: ['customers'],
|
|||
|
|
blockedTables: [],
|
|||
|
|
maxJoinTables: 4,
|
|||
|
|
maxRows: 200,
|
|||
|
|
maskedColumns: { 'customers.email': 'redact' },
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
expect(result).toEqual(expect.objectContaining({
|
|||
|
|
ok: false,
|
|||
|
|
reason: 'masked_column_denied',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 2: Run test to verify RED**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_sql_guard.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: FAIL because `mysql_query.ts` does not export the guard.
|
|||
|
|
|
|||
|
|
- [ ] **Step 3: Implement SQL guard**
|
|||
|
|
|
|||
|
|
Create the initial `packages/backend/src/modules/netaclaw/service/mysql_query.ts` with these exports:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
export type MysqlSqlGuardOptions = {
|
|||
|
|
allowedTables?: string[];
|
|||
|
|
blockedTables?: string[];
|
|||
|
|
maxJoinTables?: number;
|
|||
|
|
maxRows?: number;
|
|||
|
|
maskedColumns?: Record<string, 'hash' | 'partial' | 'redact'>;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
export type MysqlSqlGuardResult =
|
|||
|
|
| { ok: true; tables: string[]; limitedSql: string }
|
|||
|
|
| { ok: false; reason: string; tables?: string[] };
|
|||
|
|
|
|||
|
|
const DENIED_PATTERNS: Array<[RegExp, string]> = [
|
|||
|
|
[/;/, 'multiple_statements_denied'],
|
|||
|
|
[/(--|#|\/\*|\*\/)/, 'comments_denied'],
|
|||
|
|
[/^\s*(show|describe|desc|explain)\b/i, 'metadata_sql_denied'],
|
|||
|
|
[/^\s*(insert|update|delete|replace|create|alter|drop|truncate|call|grant|revoke|start|begin|commit|rollback)\b/i, 'non_select_denied'],
|
|||
|
|
[/\bunion\b/i, 'union_denied'],
|
|||
|
|
[/^\s*with\b/i, 'cte_denied'],
|
|||
|
|
[/@[a-zA-Z0-9_]+/, 'user_variable_denied'],
|
|||
|
|
[/\btemporary\b/i, 'temporary_table_denied'],
|
|||
|
|
[/\b(load_file|sleep|benchmark)\s*\(/i, 'dangerous_function_denied'],
|
|||
|
|
[/\binto\s+(out|dump)?file\b/i, 'file_output_denied'],
|
|||
|
|
[/\bfrom\s+[^,\s]+(?:\s+\w+)?\s*,\s*[^,\s]+/i, 'implicit_join_denied'],
|
|||
|
|
];
|
|||
|
|
|
|||
|
|
function normalizeIdentifier(value: string): string {
|
|||
|
|
return value.replace(/`/g, '').split('.').pop()!.toLowerCase();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function stripBackticks(value: string): string {
|
|||
|
|
return value.replace(/`/g, '').toLowerCase();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export function extractMysqlTables(sql: string): string[] {
|
|||
|
|
const tables = new Set<string>();
|
|||
|
|
const tablePattern = /\b(?:from|join)\s+(`?[a-zA-Z0-9_]+`?(?:\.`?[a-zA-Z0-9_]+`?)?)/gi;
|
|||
|
|
let match: RegExpExecArray | null;
|
|||
|
|
while ((match = tablePattern.exec(sql))) {
|
|||
|
|
tables.add(normalizeIdentifier(match[1]));
|
|||
|
|
}
|
|||
|
|
return [...tables];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export function validateMysqlReadOnlySql(sql: string, options: MysqlSqlGuardOptions = {}): MysqlSqlGuardResult {
|
|||
|
|
const trimmed = String(sql || '').trim();
|
|||
|
|
if (!trimmed) return { ok: false, reason: 'sql_empty' };
|
|||
|
|
if (!/^\s*select\b/i.test(trimmed)) return { ok: false, reason: 'only_select_allowed' };
|
|||
|
|
|
|||
|
|
for (const [pattern, reason] of DENIED_PATTERNS) {
|
|||
|
|
if (pattern.test(trimmed)) return { ok: false, reason };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (!everyJoinHasCondition(trimmed)) {
|
|||
|
|
return { ok: false, reason: 'join_condition_required' };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const tables = extractMysqlTables(trimmed);
|
|||
|
|
const allowed = new Set((options.allowedTables || []).map(item => normalizeIdentifier(item)));
|
|||
|
|
const blocked = new Set((options.blockedTables || []).map(item => normalizeIdentifier(item)));
|
|||
|
|
|
|||
|
|
for (const table of tables) {
|
|||
|
|
if (blocked.has(table)) return { ok: false, reason: 'table_blocked', tables };
|
|||
|
|
if (allowed.size === 0 || !allowed.has(table)) return { ok: false, reason: 'table_not_allowed', tables };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const maxJoinTables = Math.max(1, Math.min(options.maxJoinTables ?? 4, 6));
|
|||
|
|
if (tables.length > maxJoinTables) {
|
|||
|
|
return { ok: false, reason: 'too_many_join_tables', tables };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const maxRows = Math.max(1, Math.min(options.maxRows ?? 200, 1000));
|
|||
|
|
const explicitLimit = /\blimit\s+(\d+)/i.exec(trimmed);
|
|||
|
|
if (explicitLimit && Number(explicitLimit[1]) > maxRows) {
|
|||
|
|
return { ok: false, reason: 'limit_exceeds_max_rows', tables };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const normalizedSql = stripBackticks(trimmed);
|
|||
|
|
for (const key of Object.keys(options.maskedColumns || {})) {
|
|||
|
|
const [table, column] = key.toLowerCase().split('.');
|
|||
|
|
if (tables.includes(table) && new RegExp(`\\b${column}\\b`, 'i').test(normalizedSql)) {
|
|||
|
|
return { ok: false, reason: 'masked_column_denied', tables };
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const limitedSql = /\blimit\s+\d+/i.test(trimmed)
|
|||
|
|
? trimmed
|
|||
|
|
: `SELECT * FROM (${trimmed}) AS neta_limited_query LIMIT ?`;
|
|||
|
|
|
|||
|
|
return { ok: true, tables, limitedSql };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function everyJoinHasCondition(sql: string): boolean {
|
|||
|
|
const joinPattern = /\bjoin\b/gi;
|
|||
|
|
let match: RegExpExecArray | null;
|
|||
|
|
while ((match = joinPattern.exec(sql))) {
|
|||
|
|
const rest = sql.slice(match.index + match[0].length);
|
|||
|
|
const boundary = /\b(join|where|group\s+by|order\s+by|limit)\b/i.exec(rest);
|
|||
|
|
const segment = boundary ? rest.slice(0, boundary.index) : rest;
|
|||
|
|
if (!/\b(on|using)\b/i.test(segment)) return false;
|
|||
|
|
}
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 4: Run test to verify GREEN**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_sql_guard.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 5: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add packages/backend/src/modules/netaclaw/service/mysql_query.ts packages/backend/test/mysql_sql_guard.test.ts
|
|||
|
|
git commit -m "feat: add mysql read only sql guard"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Task 3: Data Source Service And Pool Manager
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/service/data_source.ts`
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/service/mysql_pool.ts`
|
|||
|
|
- Modify: `packages/backend/src/modules/netaclaw/service/mysql_query.ts`
|
|||
|
|
- Test: `packages/backend/test/mysql_pool_manager.test.ts`
|
|||
|
|
|
|||
|
|
- [ ] **Step 1: Write failing pool manager tests**
|
|||
|
|
|
|||
|
|
Add `packages/backend/test/mysql_pool_manager.test.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { MysqlPoolManager } from '../src/modules/netaclaw/service/mysql_pool.js';
|
|||
|
|
|
|||
|
|
describe('MysqlPoolManager', () => {
|
|||
|
|
it('reuses pools by data source id and clamps connectionLimit', async () => {
|
|||
|
|
const created: any[] = [];
|
|||
|
|
const createPool = jest.fn((config: any) => {
|
|||
|
|
const pool = { config, end: jest.fn().mockResolvedValue(undefined) };
|
|||
|
|
created.push(pool);
|
|||
|
|
return pool;
|
|||
|
|
});
|
|||
|
|
const manager = new MysqlPoolManager(createPool as any);
|
|||
|
|
const source: any = {
|
|||
|
|
id: 7,
|
|||
|
|
host: 'localhost',
|
|||
|
|
port: 3306,
|
|||
|
|
database: 'sales',
|
|||
|
|
username: 'root',
|
|||
|
|
passwordEncrypted: 'encrypted',
|
|||
|
|
extra: { poolConnectionLimit: 20, connectTimeout: 1000 },
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
manager.secretCrypto = { decryptText: jest.fn().mockReturnValue('plain') } as any;
|
|||
|
|
|
|||
|
|
const first = await manager.getPool(source);
|
|||
|
|
const second = await manager.getPool(source);
|
|||
|
|
|
|||
|
|
expect(first).toBe(second);
|
|||
|
|
expect(createPool).toHaveBeenCalledTimes(1);
|
|||
|
|
expect(created[0].config.connectionLimit).toBe(10);
|
|||
|
|
expect(created[0].config.password).toBe('plain');
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('closes and forgets a cached pool', async () => {
|
|||
|
|
const end = jest.fn().mockResolvedValue(undefined);
|
|||
|
|
const createPool = jest.fn(() => ({ end }));
|
|||
|
|
const manager = new MysqlPoolManager(createPool as any);
|
|||
|
|
manager.secretCrypto = { decryptText: jest.fn().mockReturnValue('plain') } as any;
|
|||
|
|
|
|||
|
|
await manager.getPool({ id: 1, host: 'h', port: 3306, database: 'd', username: 'u', passwordEncrypted: 'p', extra: {} } as any);
|
|||
|
|
await manager.closePool(1);
|
|||
|
|
|
|||
|
|
expect(end).toHaveBeenCalledTimes(1);
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 2: Run test to verify RED**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: FAIL because `mysql_pool.ts` does not exist.
|
|||
|
|
|
|||
|
|
- [ ] **Step 3: Implement pool manager**
|
|||
|
|
|
|||
|
|
Create `packages/backend/src/modules/netaclaw/service/mysql_pool.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
|
|||
|
|
import mysql from 'mysql2/promise';
|
|||
|
|
import type { Pool } from 'mysql2/promise';
|
|||
|
|
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
|
|||
|
|
import { SecretCryptoService } from './secret_crypto.js';
|
|||
|
|
|
|||
|
|
type CreatePool = typeof mysql.createPool;
|
|||
|
|
|
|||
|
|
@Provide()
|
|||
|
|
@Scope(ScopeEnum.Singleton)
|
|||
|
|
export class MysqlPoolManager {
|
|||
|
|
@Inject()
|
|||
|
|
secretCrypto: SecretCryptoService;
|
|||
|
|
|
|||
|
|
private pools = new Map<number, Pool>();
|
|||
|
|
private readonly createPool: CreatePool;
|
|||
|
|
|
|||
|
|
constructor(createPool: CreatePool = mysql.createPool) {
|
|||
|
|
this.createPool = createPool;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async getPool(source: NetaClawDataSourceEntity): Promise<Pool> {
|
|||
|
|
const existing = this.pools.get(source.id);
|
|||
|
|
if (existing) return existing;
|
|||
|
|
|
|||
|
|
const limit = Math.max(1, Math.min(Number(source.extra?.poolConnectionLimit ?? 3), 10));
|
|||
|
|
const pool = this.createPool({
|
|||
|
|
host: source.host,
|
|||
|
|
port: source.port || 3306,
|
|||
|
|
database: source.database,
|
|||
|
|
user: source.username,
|
|||
|
|
password: source.passwordEncrypted ? this.secretCrypto.decryptText(source.passwordEncrypted) : undefined,
|
|||
|
|
waitForConnections: true,
|
|||
|
|
connectionLimit: limit,
|
|||
|
|
connectTimeout: Math.max(1000, Number(source.extra?.connectTimeout ?? 10000)),
|
|||
|
|
ssl: source.extra?.ssl as any,
|
|||
|
|
});
|
|||
|
|
this.pools.set(source.id, pool);
|
|||
|
|
return pool;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async closePool(sourceId: number): Promise<void> {
|
|||
|
|
const pool = this.pools.get(sourceId);
|
|||
|
|
if (!pool) return;
|
|||
|
|
this.pools.delete(sourceId);
|
|||
|
|
await pool.end();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
createTransientPool(source: NetaClawDataSourceEntity): Pool {
|
|||
|
|
const limit = Math.max(1, Math.min(Number(source.extra?.poolConnectionLimit ?? 1), 3));
|
|||
|
|
return this.createPool({
|
|||
|
|
host: source.host,
|
|||
|
|
port: source.port || 3306,
|
|||
|
|
database: source.database,
|
|||
|
|
user: source.username,
|
|||
|
|
password: source.passwordEncrypted ? this.secretCrypto.decryptText(source.passwordEncrypted) : undefined,
|
|||
|
|
waitForConnections: true,
|
|||
|
|
connectionLimit: limit,
|
|||
|
|
connectTimeout: Math.max(1000, Number(source.extra?.connectTimeout ?? 10000)),
|
|||
|
|
ssl: source.extra?.ssl as any,
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 4: Run pool tests to verify GREEN**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 5: Implement `DataSourceService`**
|
|||
|
|
|
|||
|
|
Create `packages/backend/src/modules/netaclaw/service/data_source.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
|
|||
|
|
import { InjectEntityModel } from '@midwayjs/typeorm';
|
|||
|
|
import { Repository } from 'typeorm';
|
|||
|
|
import { NetaClawDataSourceEntity, NetaClawDataSourceExtra } from '../entity/data_source.js';
|
|||
|
|
import { SecretCryptoService } from './secret_crypto.js';
|
|||
|
|
import { MysqlPoolManager } from './mysql_pool.js';
|
|||
|
|
|
|||
|
|
export type AdminSafeDataSource = Omit<NetaClawDataSourceEntity, 'passwordEncrypted'> & {
|
|||
|
|
hasPassword: boolean;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
export type AgentDataSourceSummary = Pick<NetaClawDataSourceEntity, 'name' | 'label' | 'database' | 'status'>;
|
|||
|
|
|
|||
|
|
export type SaveDataSourceInput = Partial<NetaClawDataSourceEntity> & {
|
|||
|
|
password?: string;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
@Provide()
|
|||
|
|
@Scope(ScopeEnum.Singleton)
|
|||
|
|
export class NetaClawDataSourceService {
|
|||
|
|
@InjectEntityModel(NetaClawDataSourceEntity)
|
|||
|
|
repo: Repository<NetaClawDataSourceEntity>;
|
|||
|
|
|
|||
|
|
@Inject()
|
|||
|
|
secretCrypto: SecretCryptoService;
|
|||
|
|
|
|||
|
|
@Inject()
|
|||
|
|
mysqlPoolManager: MysqlPoolManager;
|
|||
|
|
|
|||
|
|
toAdminSafe(entity: NetaClawDataSourceEntity): AdminSafeDataSource {
|
|||
|
|
const { passwordEncrypted, ...rest } = entity as any;
|
|||
|
|
return { ...rest, hasPassword: !!passwordEncrypted };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
toAgentSummary(entity: NetaClawDataSourceEntity): AgentDataSourceSummary {
|
|||
|
|
return {
|
|||
|
|
name: entity.name,
|
|||
|
|
label: entity.label,
|
|||
|
|
database: entity.database,
|
|||
|
|
status: entity.status,
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async listAdminSafe(): Promise<AdminSafeDataSource[]> {
|
|||
|
|
const rows = await this.repo.find({ order: { id: 'DESC' } as any });
|
|||
|
|
return rows.map(row => this.toAdminSafe(row));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async listForAgent(agentId: number): Promise<AgentDataSourceSummary[]> {
|
|||
|
|
const rows = await this.repo.find({ where: { type: 'mysql', status: 1 } as any, order: { id: 'DESC' } as any });
|
|||
|
|
return rows
|
|||
|
|
.filter(row => Array.isArray(row.allowedAgentIds) && row.allowedAgentIds.includes(agentId))
|
|||
|
|
.map(row => this.toAgentSummary(row));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async getAuthorizedSource(name: string, agentId: number): Promise<NetaClawDataSourceEntity> {
|
|||
|
|
const source = await this.repo.findOne({ where: { name, type: 'mysql', status: 1 } as any });
|
|||
|
|
if (!source) throw new Error('data_source_not_found');
|
|||
|
|
if (!Array.isArray(source.allowedAgentIds) || !source.allowedAgentIds.includes(agentId)) {
|
|||
|
|
throw new Error('data_source_not_authorized');
|
|||
|
|
}
|
|||
|
|
return source;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
normalizeExtra(extra?: NetaClawDataSourceExtra | null): NetaClawDataSourceExtra {
|
|||
|
|
return {
|
|||
|
|
allowedTables: extra?.allowedTables ?? [],
|
|||
|
|
blockedTables: extra?.blockedTables ?? [],
|
|||
|
|
maskedColumns: extra?.maskedColumns ?? {},
|
|||
|
|
schemaVisibility: extra?.schemaVisibility ?? 'allowed-only',
|
|||
|
|
queryTimeoutMs: Math.max(1000, Math.min(Number(extra?.queryTimeoutMs ?? 10000), 30000)),
|
|||
|
|
maxRows: Math.max(1, Math.min(Number(extra?.maxRows ?? 200), 1000)),
|
|||
|
|
maxJoinTables: Math.max(1, Math.min(Number(extra?.maxJoinTables ?? 4), 6)),
|
|||
|
|
poolConnectionLimit: Math.max(1, Math.min(Number(extra?.poolConnectionLimit ?? 3), 10)),
|
|||
|
|
ssl: extra?.ssl,
|
|||
|
|
connectTimeout: extra?.connectTimeout,
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async saveConfig(input: SaveDataSourceInput): Promise<AdminSafeDataSource> {
|
|||
|
|
const existing = input.id ? await this.repo.findOneBy({ id: input.id }) : null;
|
|||
|
|
const entity = this.repo.create({
|
|||
|
|
...(existing || {}),
|
|||
|
|
...input,
|
|||
|
|
type: 'mysql',
|
|||
|
|
readonly: 1,
|
|||
|
|
status: input.status ?? existing?.status ?? 1,
|
|||
|
|
port: Number(input.port ?? existing?.port ?? 3306),
|
|||
|
|
allowedAgentIds: Array.isArray(input.allowedAgentIds) ? input.allowedAgentIds : existing?.allowedAgentIds ?? [],
|
|||
|
|
extra: this.normalizeExtra(input.extra),
|
|||
|
|
});
|
|||
|
|
if (input.password) {
|
|||
|
|
entity.passwordEncrypted = this.secretCrypto.encryptText(input.password);
|
|||
|
|
}
|
|||
|
|
const saved = await this.repo.save(entity);
|
|||
|
|
await this.mysqlPoolManager.closePool(saved.id);
|
|||
|
|
return this.toAdminSafe(saved);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async delete(id: number): Promise<void> {
|
|||
|
|
await this.repo.delete(id);
|
|||
|
|
await this.mysqlPoolManager.closePool(id);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async testConnection(input: SaveDataSourceInput): Promise<{ ok: boolean; error?: string }> {
|
|||
|
|
const existing = input.id ? await this.repo.findOneBy({ id: input.id }) : null;
|
|||
|
|
const source = this.repo.create({
|
|||
|
|
...(existing || {}),
|
|||
|
|
...input,
|
|||
|
|
id: input.id ?? -1,
|
|||
|
|
type: 'mysql',
|
|||
|
|
readonly: 1,
|
|||
|
|
port: Number(input.port ?? 3306),
|
|||
|
|
allowedAgentIds: Array.isArray(input.allowedAgentIds) ? input.allowedAgentIds : [],
|
|||
|
|
extra: this.normalizeExtra(input.extra),
|
|||
|
|
passwordEncrypted: input.password
|
|||
|
|
? this.secretCrypto.encryptText(input.password)
|
|||
|
|
: input.passwordEncrypted ?? existing?.passwordEncrypted,
|
|||
|
|
} as any);
|
|||
|
|
let pool: any;
|
|||
|
|
try {
|
|||
|
|
pool = this.mysqlPoolManager.createTransientPool(source);
|
|||
|
|
await pool.query('SELECT 1 AS ok');
|
|||
|
|
return { ok: true };
|
|||
|
|
} catch (err: any) {
|
|||
|
|
return { ok: false, error: sanitizeMysqlConnectionError(err) };
|
|||
|
|
} finally {
|
|||
|
|
if (pool) await pool.end();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function sanitizeMysqlConnectionError(err: any): string {
|
|||
|
|
return err?.code || err?.sqlState || err?.errno ? `mysql_connection_failed:${err.code || err.sqlState || err.errno}` : 'mysql_connection_failed';
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 6: Run focused tests**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts test/secret_crypto.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 7: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add packages/backend/src/modules/netaclaw/service/data_source.ts packages/backend/src/modules/netaclaw/service/mysql_pool.ts packages/backend/test/mysql_pool_manager.test.ts
|
|||
|
|
git commit -m "feat: add mysql data source services"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Task 4: Schema Mapping And Query Execution
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/service/mysql_schema.ts`
|
|||
|
|
- Modify: `packages/backend/src/modules/netaclaw/service/mysql_query.ts`
|
|||
|
|
- Test: `packages/backend/test/mysql_schema_mapper.test.ts`
|
|||
|
|
- Test: `packages/backend/test/mysql_query_service.test.ts`
|
|||
|
|
|
|||
|
|
- [ ] **Step 1: Write failing schema mapper test**
|
|||
|
|
|
|||
|
|
Add `packages/backend/test/mysql_schema_mapper.test.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { mapMysqlSchemaRows } from '../src/modules/netaclaw/service/mysql_schema.js';
|
|||
|
|
|
|||
|
|
describe('mapMysqlSchemaRows', () => {
|
|||
|
|
it('maps columns, primary keys, indexes and foreign keys', () => {
|
|||
|
|
const result = mapMysqlSchemaRows({
|
|||
|
|
columns: [
|
|||
|
|
{ tableName: 'orders', tableComment: '订单', columnName: 'id', columnType: 'int', isNullable: 'NO', columnKey: 'PRI', columnComment: 'ID' },
|
|||
|
|
{ tableName: 'orders', tableComment: '订单', columnName: 'customer_id', columnType: 'int', isNullable: 'NO', columnKey: 'MUL', columnComment: '客户' },
|
|||
|
|
],
|
|||
|
|
indexes: [
|
|||
|
|
{ tableName: 'orders', indexName: 'PRIMARY', columnName: 'id', nonUnique: 0, seqInIndex: 1 },
|
|||
|
|
{ tableName: 'orders', indexName: 'idx_customer', columnName: 'customer_id', nonUnique: 1, seqInIndex: 1 },
|
|||
|
|
],
|
|||
|
|
foreignKeys: [
|
|||
|
|
{ tableName: 'orders', columnName: 'customer_id', constraintName: 'fk_orders_customer', referencedTableName: 'customers', referencedColumnName: 'id' },
|
|||
|
|
],
|
|||
|
|
maskedColumns: { 'orders.customer_id': 'redact' },
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
expect(result.tables).toEqual([
|
|||
|
|
expect.objectContaining({
|
|||
|
|
name: 'orders',
|
|||
|
|
comment: '订单',
|
|||
|
|
primaryKey: ['id'],
|
|||
|
|
columns: [
|
|||
|
|
expect.objectContaining({ name: 'id', type: 'int', nullable: false, key: 'PRI', masked: false }),
|
|||
|
|
expect.objectContaining({ name: 'customer_id', type: 'int', nullable: false, key: 'MUL', masked: true, maskMode: 'redact' }),
|
|||
|
|
],
|
|||
|
|
indexes: expect.arrayContaining([
|
|||
|
|
expect.objectContaining({ name: 'idx_customer', columns: ['customer_id'], unique: false }),
|
|||
|
|
]),
|
|||
|
|
foreignKeys: [
|
|||
|
|
expect.objectContaining({ column: 'customer_id', referencedTable: 'customers', referencedColumn: 'id' }),
|
|||
|
|
],
|
|||
|
|
}),
|
|||
|
|
]);
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 2: Run schema mapper test to verify RED**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: FAIL because `mysql_schema.ts` does not exist.
|
|||
|
|
|
|||
|
|
- [ ] **Step 3: Implement schema mapper and service skeleton**
|
|||
|
|
|
|||
|
|
Create `packages/backend/src/modules/netaclaw/service/mysql_schema.ts` with:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
|
|||
|
|
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
|
|||
|
|
import { MysqlPoolManager } from './mysql_pool.js';
|
|||
|
|
|
|||
|
|
type MaskMode = 'hash' | 'partial' | 'redact';
|
|||
|
|
|
|||
|
|
export interface MysqlSchemaRows {
|
|||
|
|
columns: Array<{ tableName: string; tableComment?: string; columnName: string; columnType: string; isNullable: string; columnKey?: string; columnComment?: string }>;
|
|||
|
|
indexes: Array<{ tableName: string; indexName: string; columnName: string; nonUnique: number; seqInIndex: number }>;
|
|||
|
|
foreignKeys: Array<{ tableName: string; columnName: string; constraintName: string; referencedTableName: string; referencedColumnName: string }>;
|
|||
|
|
maskedColumns?: Record<string, MaskMode>;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export function mapMysqlSchemaRows(rows: MysqlSchemaRows) {
|
|||
|
|
const tableMap = new Map<string, any>();
|
|||
|
|
const getTable = (name: string, comment?: string) => {
|
|||
|
|
if (!tableMap.has(name)) {
|
|||
|
|
tableMap.set(name, { name, comment: comment || '', columns: [], primaryKey: [], indexes: [], foreignKeys: [] });
|
|||
|
|
}
|
|||
|
|
return tableMap.get(name);
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
for (const column of rows.columns) {
|
|||
|
|
const table = getTable(column.tableName, column.tableComment);
|
|||
|
|
const maskKey = `${column.tableName}.${column.columnName}`;
|
|||
|
|
const maskMode = rows.maskedColumns?.[maskKey];
|
|||
|
|
if (column.columnKey === 'PRI') table.primaryKey.push(column.columnName);
|
|||
|
|
table.columns.push({
|
|||
|
|
name: column.columnName,
|
|||
|
|
type: column.columnType,
|
|||
|
|
nullable: column.isNullable === 'YES',
|
|||
|
|
key: column.columnKey || '',
|
|||
|
|
comment: column.columnComment || '',
|
|||
|
|
masked: !!maskMode,
|
|||
|
|
maskMode: maskMode || null,
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const indexGroups = new Map<string, any>();
|
|||
|
|
for (const index of rows.indexes) {
|
|||
|
|
const key = `${index.tableName}.${index.indexName}`;
|
|||
|
|
if (!indexGroups.has(key)) {
|
|||
|
|
indexGroups.set(key, { tableName: index.tableName, name: index.indexName, unique: index.nonUnique === 0, columns: [] as string[] });
|
|||
|
|
}
|
|||
|
|
indexGroups.get(key).columns[index.seqInIndex - 1] = index.columnName;
|
|||
|
|
}
|
|||
|
|
for (const index of indexGroups.values()) {
|
|||
|
|
getTable(index.tableName).indexes.push({ name: index.name, unique: index.unique, columns: index.columns.filter(Boolean) });
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for (const fk of rows.foreignKeys) {
|
|||
|
|
getTable(fk.tableName).foreignKeys.push({
|
|||
|
|
name: fk.constraintName,
|
|||
|
|
column: fk.columnName,
|
|||
|
|
referencedTable: fk.referencedTableName,
|
|||
|
|
referencedColumn: fk.referencedColumnName,
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return { tables: [...tableMap.values()] };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@Provide()
|
|||
|
|
@Scope(ScopeEnum.Singleton)
|
|||
|
|
export class MysqlIntrospectionService {
|
|||
|
|
@Inject()
|
|||
|
|
poolManager: MysqlPoolManager;
|
|||
|
|
|
|||
|
|
async listSchema(source: NetaClawDataSourceEntity, options: { tables?: string[] } = {}) {
|
|||
|
|
const allowedTables = source.extra?.allowedTables || [];
|
|||
|
|
const requested = options.tables?.length ? options.tables : allowedTables;
|
|||
|
|
const visibleTables = requested.filter(table => allowedTables.includes(table) && !(source.extra?.blockedTables || []).includes(table));
|
|||
|
|
if (visibleTables.length === 0) return { tables: [] };
|
|||
|
|
|
|||
|
|
const pool = await this.poolManager.getPool(source);
|
|||
|
|
const placeholders = visibleTables.map(() => '?').join(',');
|
|||
|
|
const [columns] = await pool.query(
|
|||
|
|
`SELECT TABLE_NAME AS tableName, '' AS tableComment, COLUMN_NAME AS columnName, COLUMN_TYPE AS columnType, IS_NULLABLE AS isNullable, COLUMN_KEY AS columnKey, COLUMN_COMMENT AS columnComment
|
|||
|
|
FROM information_schema.COLUMNS
|
|||
|
|
WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders})
|
|||
|
|
ORDER BY TABLE_NAME, ORDINAL_POSITION`,
|
|||
|
|
[source.database, ...visibleTables]
|
|||
|
|
);
|
|||
|
|
const [indexes] = await pool.query(
|
|||
|
|
`SELECT TABLE_NAME AS tableName, INDEX_NAME AS indexName, COLUMN_NAME AS columnName, NON_UNIQUE AS nonUnique, SEQ_IN_INDEX AS seqInIndex
|
|||
|
|
FROM information_schema.STATISTICS
|
|||
|
|
WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders})
|
|||
|
|
ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX`,
|
|||
|
|
[source.database, ...visibleTables]
|
|||
|
|
);
|
|||
|
|
const [foreignKeys] = await pool.query(
|
|||
|
|
`SELECT TABLE_NAME AS tableName, COLUMN_NAME AS columnName, CONSTRAINT_NAME AS constraintName, REFERENCED_TABLE_NAME AS referencedTableName, REFERENCED_COLUMN_NAME AS referencedColumnName
|
|||
|
|
FROM information_schema.KEY_COLUMN_USAGE
|
|||
|
|
WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders}) AND REFERENCED_TABLE_NAME IS NOT NULL`,
|
|||
|
|
[source.database, ...visibleTables]
|
|||
|
|
);
|
|||
|
|
return mapMysqlSchemaRows({
|
|||
|
|
columns: columns as MysqlSchemaRows['columns'],
|
|||
|
|
indexes: indexes as MysqlSchemaRows['indexes'],
|
|||
|
|
foreignKeys: foreignKeys as MysqlSchemaRows['foreignKeys'],
|
|||
|
|
maskedColumns: source.extra?.maskedColumns,
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async sampleTable(source: NetaClawDataSourceEntity, options: { table: string; columns?: string[]; limit?: number }) {
|
|||
|
|
const allowedTables = source.extra?.allowedTables || [];
|
|||
|
|
const blockedTables = source.extra?.blockedTables || [];
|
|||
|
|
if (!allowedTables.includes(options.table) || blockedTables.includes(options.table)) {
|
|||
|
|
throw new Error('table_not_allowed');
|
|||
|
|
}
|
|||
|
|
const schema = await this.listSchema(source, { tables: [options.table] });
|
|||
|
|
const table = schema.tables[0];
|
|||
|
|
if (!table) throw new Error('table_not_found');
|
|||
|
|
const visibleColumns = new Set(table.columns.map((column: any) => column.name));
|
|||
|
|
const requestedColumns = options.columns?.length ? options.columns : table.columns.map((column: any) => column.name);
|
|||
|
|
for (const column of requestedColumns) {
|
|||
|
|
if (!/^[A-Za-z0-9_]+$/.test(column) || !visibleColumns.has(column)) {
|
|||
|
|
throw new Error('column_not_allowed');
|
|||
|
|
}
|
|||
|
|
if (source.extra?.maskedColumns?.[`${options.table}.${column}`]) {
|
|||
|
|
throw new Error('masked_column_denied');
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
const safeTable = `\`${options.table.replace(/`/g, '')}\``;
|
|||
|
|
const safeColumns = requestedColumns.map(column => `\`${column.replace(/`/g, '')}\``).join(', ');
|
|||
|
|
const limit = Math.max(1, Math.min(Number(options.limit ?? 5), 20));
|
|||
|
|
const pool = await this.poolManager.getPool(source);
|
|||
|
|
const [rows, fields] = await pool.query(`SELECT ${safeColumns} FROM ${safeTable} LIMIT ?`, [limit]);
|
|||
|
|
return {
|
|||
|
|
columns: Array.isArray(fields) ? fields.map((field: any) => field.name) : requestedColumns,
|
|||
|
|
rows: Array.isArray(rows) ? rows : [],
|
|||
|
|
rowCount: Array.isArray(rows) ? rows.length : 0,
|
|||
|
|
truncated: false,
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 4: Run schema mapper test to verify GREEN**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 5: Write failing query service test**
|
|||
|
|
|
|||
|
|
Add `packages/backend/test/mysql_query_service.test.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { MysqlQueryService } from '../src/modules/netaclaw/service/mysql_query.js';
|
|||
|
|
|
|||
|
|
describe('MysqlQueryService', () => {
|
|||
|
|
it('executes a guarded query and writes audit', async () => {
|
|||
|
|
const query = jest.fn().mockResolvedValue([
|
|||
|
|
[
|
|||
|
|
{ id: 1, total: 99 },
|
|||
|
|
],
|
|||
|
|
[{ name: 'id' }, { name: 'total' }],
|
|||
|
|
]);
|
|||
|
|
const service = new MysqlQueryService();
|
|||
|
|
service.poolManager = { getPool: jest.fn().mockResolvedValue({ query }) } as any;
|
|||
|
|
service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;
|
|||
|
|
|
|||
|
|
const result = await service.executeReadOnly({
|
|||
|
|
source: {
|
|||
|
|
id: 1,
|
|||
|
|
database: 'sales',
|
|||
|
|
extra: {
|
|||
|
|
allowedTables: ['orders'],
|
|||
|
|
blockedTables: [],
|
|||
|
|
maskedColumns: {},
|
|||
|
|
maxRows: 200,
|
|||
|
|
maxJoinTables: 4,
|
|||
|
|
queryTimeoutMs: 10000,
|
|||
|
|
},
|
|||
|
|
} as any,
|
|||
|
|
sql: 'SELECT id, total FROM orders LIMIT 10',
|
|||
|
|
agentId: 3,
|
|||
|
|
userId: 5,
|
|||
|
|
toolCallId: 'tool-1',
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
expect(result.rows).toEqual([{ id: 1, total: 99 }]);
|
|||
|
|
expect(result.rowCount).toBe(1);
|
|||
|
|
expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
|
|||
|
|
dataSourceId: 1,
|
|||
|
|
agentId: 3,
|
|||
|
|
userId: 5,
|
|||
|
|
toolCallId: 'tool-1',
|
|||
|
|
status: 'success',
|
|||
|
|
rowCount: 1,
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('rejects masked column references before querying mysql', async () => {
|
|||
|
|
const service = new MysqlQueryService();
|
|||
|
|
service.poolManager = { getPool: jest.fn() } as any;
|
|||
|
|
service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;
|
|||
|
|
|
|||
|
|
await expect(service.executeReadOnly({
|
|||
|
|
source: {
|
|||
|
|
id: 1,
|
|||
|
|
extra: {
|
|||
|
|
allowedTables: ['orders'],
|
|||
|
|
blockedTables: [],
|
|||
|
|
maskedColumns: { 'orders.email': 'redact' },
|
|||
|
|
maxRows: 200,
|
|||
|
|
maxJoinTables: 4,
|
|||
|
|
},
|
|||
|
|
} as any,
|
|||
|
|
sql: 'SELECT email AS e FROM orders LIMIT 10',
|
|||
|
|
agentId: 3,
|
|||
|
|
userId: 5,
|
|||
|
|
toolCallId: 'tool-masked',
|
|||
|
|
})).rejects.toThrow('mysql_sql_rejected');
|
|||
|
|
|
|||
|
|
expect(service.poolManager.getPool).not.toHaveBeenCalled();
|
|||
|
|
expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
|
|||
|
|
status: 'rejected',
|
|||
|
|
rejectReason: 'masked_column_denied',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('audits rejected SQL', async () => {
|
|||
|
|
const service = new MysqlQueryService();
|
|||
|
|
service.poolManager = { getPool: jest.fn() } as any;
|
|||
|
|
service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;
|
|||
|
|
|
|||
|
|
await expect(service.executeReadOnly({
|
|||
|
|
source: { id: 1, extra: { allowedTables: ['orders'], blockedTables: [] } } as any,
|
|||
|
|
sql: 'DROP TABLE orders',
|
|||
|
|
agentId: 3,
|
|||
|
|
userId: 5,
|
|||
|
|
toolCallId: 'tool-2',
|
|||
|
|
})).rejects.toThrow('mysql_sql_rejected');
|
|||
|
|
|
|||
|
|
expect(service.poolManager.getPool).not.toHaveBeenCalled();
|
|||
|
|
expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
|
|||
|
|
status: 'rejected',
|
|||
|
|
}));
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 6: Run query service test to verify RED**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_query_service.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: FAIL because `MysqlQueryService` is not implemented.
|
|||
|
|
|
|||
|
|
- [ ] **Step 7: Implement query service**
|
|||
|
|
|
|||
|
|
Extend `packages/backend/src/modules/netaclaw/service/mysql_query.ts` with:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
|
|||
|
|
import { InjectEntityModel } from '@midwayjs/typeorm';
|
|||
|
|
import { Repository } from 'typeorm';
|
|||
|
|
import * as crypto from 'crypto';
|
|||
|
|
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
|
|||
|
|
import { NetaClawDataSourceQueryAuditEntity } from '../entity/data_source_query_audit.js';
|
|||
|
|
import { MysqlPoolManager } from './mysql_pool.js';
|
|||
|
|
|
|||
|
|
export interface ExecuteReadOnlyParams {
|
|||
|
|
source: NetaClawDataSourceEntity;
|
|||
|
|
sql: string;
|
|||
|
|
agentId?: number;
|
|||
|
|
userId?: number;
|
|||
|
|
toolCallId?: string;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function hashSql(sql: string): string {
|
|||
|
|
return crypto.createHash('sha256').update(sql).digest('hex');
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function sanitizeMysqlError(err: any): string {
|
|||
|
|
const code = err?.code || err?.sqlState || err?.errno;
|
|||
|
|
return code ? `mysql_query_failed:${code}` : 'mysql_query_failed';
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@Provide()
|
|||
|
|
@Scope(ScopeEnum.Singleton)
|
|||
|
|
export class MysqlQueryService {
|
|||
|
|
@Inject()
|
|||
|
|
poolManager: MysqlPoolManager;
|
|||
|
|
|
|||
|
|
@InjectEntityModel(NetaClawDataSourceQueryAuditEntity)
|
|||
|
|
auditRepo: Repository<NetaClawDataSourceQueryAuditEntity>;
|
|||
|
|
|
|||
|
|
async executeReadOnly(params: ExecuteReadOnlyParams) {
|
|||
|
|
const start = Date.now();
|
|||
|
|
const extra = params.source.extra || {};
|
|||
|
|
const guard = validateMysqlReadOnlySql(params.sql, {
|
|||
|
|
allowedTables: extra.allowedTables || [],
|
|||
|
|
blockedTables: extra.blockedTables || [],
|
|||
|
|
maxJoinTables: extra.maxJoinTables || 4,
|
|||
|
|
maxRows: extra.maxRows || 200,
|
|||
|
|
maskedColumns: extra.maskedColumns || {},
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
if (!guard.ok) {
|
|||
|
|
await this.writeAudit(params, 'rejected', start, 0, guard.reason);
|
|||
|
|
throw new Error(`mysql_sql_rejected: ${guard.reason}`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
const pool = await this.poolManager.getPool(params.source);
|
|||
|
|
const maxRows = Math.max(1, Math.min(Number(extra.maxRows ?? 200), 1000));
|
|||
|
|
const appendedLimit = guard.limitedSql.includes('?');
|
|||
|
|
const sql = guard.limitedSql.includes('?') ? guard.limitedSql : guard.limitedSql;
|
|||
|
|
const sqlParams = appendedLimit ? [maxRows + 1] : [];
|
|||
|
|
const [rows, fields] = await pool.query({ sql, timeout: Math.max(1000, Math.min(Number(extra.queryTimeoutMs ?? 10000), 30000)) } as any, sqlParams);
|
|||
|
|
const fetchedRows = Array.isArray(rows) ? rows as any[] : [];
|
|||
|
|
const truncated = appendedLimit && fetchedRows.length > maxRows;
|
|||
|
|
const resultRows = truncated ? fetchedRows.slice(0, Math.min(fetchedRows.length, maxRows)) : fetchedRows;
|
|||
|
|
await this.writeAudit(params, 'success', start, resultRows.length, null);
|
|||
|
|
return {
|
|||
|
|
columns: Array.isArray(fields) ? fields.map((field: any) => field.name) : Object.keys(resultRows[0] || {}),
|
|||
|
|
rows: resultRows,
|
|||
|
|
rowCount: resultRows.length,
|
|||
|
|
truncated,
|
|||
|
|
elapsedMs: Date.now() - start,
|
|||
|
|
sql: params.sql,
|
|||
|
|
};
|
|||
|
|
} catch (err: any) {
|
|||
|
|
await this.writeAudit(params, 'failed', start, 0, null, err?.code || err?.errno || err?.sqlState);
|
|||
|
|
throw new Error(sanitizeMysqlError(err));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private async writeAudit(
|
|||
|
|
params: ExecuteReadOnlyParams,
|
|||
|
|
status: 'success' | 'rejected' | 'failed',
|
|||
|
|
start: number,
|
|||
|
|
rowCount: number,
|
|||
|
|
rejectReason?: string | null,
|
|||
|
|
errorCode?: string | number | null
|
|||
|
|
) {
|
|||
|
|
await this.auditRepo.save({
|
|||
|
|
dataSourceId: params.source.id,
|
|||
|
|
agentId: params.agentId ?? null,
|
|||
|
|
userId: params.userId ?? null,
|
|||
|
|
toolCallId: params.toolCallId ?? null,
|
|||
|
|
sqlHash: hashSql(params.sql),
|
|||
|
|
sqlPreview: params.sql.slice(0, 1000),
|
|||
|
|
status,
|
|||
|
|
rejectReason: rejectReason ?? null,
|
|||
|
|
elapsedMs: Date.now() - start,
|
|||
|
|
rowCount,
|
|||
|
|
errorCode: errorCode ? String(errorCode) : null,
|
|||
|
|
} as any);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 8: Run schema/query tests**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts test/mysql_query_service.test.ts test/mysql_sql_guard.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 9: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add packages/backend/src/modules/netaclaw/service/mysql_schema.ts packages/backend/src/modules/netaclaw/service/mysql_query.ts packages/backend/test/mysql_schema_mapper.test.ts packages/backend/test/mysql_query_service.test.ts
|
|||
|
|
git commit -m "feat: add mysql schema and query services"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Task 5: MySQL Agent Tools And Resolver Wiring
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts`
|
|||
|
|
- Modify: `packages/backend/src/modules/netaclaw/tools/catalog.ts`
|
|||
|
|
- Modify: `packages/backend/src/modules/netaclaw/tools/manifest.ts`
|
|||
|
|
- Modify: `packages/backend/src/modules/netaclaw/service/tool_resolver.ts`
|
|||
|
|
- Test: `packages/backend/test/tool_resolver.test.ts`
|
|||
|
|
|
|||
|
|
- [ ] **Step 1: Write failing resolver tests**
|
|||
|
|
|
|||
|
|
Append to `packages/backend/test/tool_resolver.test.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
it('resolves mysql toolset tools when enabled for an agent', async () => {
|
|||
|
|
service.toolRegistry = {
|
|||
|
|
all: jest.fn().mockResolvedValue([
|
|||
|
|
{ name: 'mysql_list_sources', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
|
|||
|
|
{ name: 'mysql_schema', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
|
|||
|
|
{ name: 'mysql_table_sample', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
|
|||
|
|
{ name: 'mysql_query', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
|
|||
|
|
]),
|
|||
|
|
} as any;
|
|||
|
|
service.skillLoader = {} as any;
|
|||
|
|
service.dataSourceService = { listForAgent: jest.fn().mockResolvedValue([]) } as any;
|
|||
|
|
service.mysqlIntrospection = {} as any;
|
|||
|
|
service.mysqlQuery = {} as any;
|
|||
|
|
|
|||
|
|
const result = await service.resolve({
|
|||
|
|
agent: { id: 9, toolsets: ['mysql'], tools: { disabled: [] } } as any,
|
|||
|
|
modelCapability: 'text',
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
expect(result.toolNames).toEqual(['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']);
|
|||
|
|
expect(result.tools.map(tool => tool.name)).toEqual(['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
it('routes mysql tools through main process proxy for subagent workers', async () => {
|
|||
|
|
service.toolRegistry = {
|
|||
|
|
all: jest.fn().mockResolvedValue([
|
|||
|
|
{ name: 'mysql_query', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
|
|||
|
|
]),
|
|||
|
|
} as any;
|
|||
|
|
service.skillLoader = {} as any;
|
|||
|
|
service.dataSourceService = {} as any;
|
|||
|
|
service.mysqlIntrospection = {} as any;
|
|||
|
|
service.mysqlQuery = {} as any;
|
|||
|
|
|
|||
|
|
const result = await service.resolve({
|
|||
|
|
agent: {
|
|||
|
|
tools: { inheritCoreTools: false, enabled: ['mysql_query'], disabled: [] },
|
|||
|
|
} as any,
|
|||
|
|
modelCapability: 'text',
|
|||
|
|
delegationRole: 'subagent',
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
expect(result.toolManifest).toEqual([
|
|||
|
|
expect.objectContaining({
|
|||
|
|
name: 'mysql_query',
|
|||
|
|
supportedInWorker: false,
|
|||
|
|
workerRoutingHint: 'main-process-proxy',
|
|||
|
|
}),
|
|||
|
|
]);
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 2: Run resolver test to verify RED**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: FAIL because mysql tools are not registered or injected.
|
|||
|
|
|
|||
|
|
- [ ] **Step 3: Implement MySQL tools**
|
|||
|
|
|
|||
|
|
Create `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { Type } from '@sinclair/typebox';
|
|||
|
|
import { AgentToolWithMeta, jsonResult } from '../common.js';
|
|||
|
|
import { registerSchema } from '../catalog.js';
|
|||
|
|
import { NetaClawDataSourceService } from '../../service/data_source.js';
|
|||
|
|
import { MysqlIntrospectionService } from '../../service/mysql_schema.js';
|
|||
|
|
import { MysqlQueryService } from '../../service/mysql_query.js';
|
|||
|
|
|
|||
|
|
export const TOOLSET_MYSQL = 'mysql' as const;
|
|||
|
|
|
|||
|
|
const Runtime = Type.Optional(Type.Any());
|
|||
|
|
const SourceParam = Type.Object({
|
|||
|
|
source: Type.String({ description: '数据源名称' }),
|
|||
|
|
_netaRuntime: Runtime,
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
function readAgentId(params: any): number {
|
|||
|
|
const id = params?._netaRuntime?.currentAgent?.id;
|
|||
|
|
if (!id) throw new Error('current_agent_id_missing');
|
|||
|
|
return Number(id);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function readUserId(params: any): number | undefined {
|
|||
|
|
const id = params?._netaRuntime?.userId;
|
|||
|
|
return id === undefined || id === null ? undefined : Number(id);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export function createMysqlListSourcesTool(deps: {
|
|||
|
|
dataSourceService: NetaClawDataSourceService;
|
|||
|
|
}): AgentToolWithMeta<typeof Type.Object<any>, unknown> {
|
|||
|
|
const Params = Type.Object({ _netaRuntime: Runtime });
|
|||
|
|
return {
|
|||
|
|
name: 'mysql_list_sources',
|
|||
|
|
label: 'MySQL 数据源列表',
|
|||
|
|
description: '列出当前 Agent 授权可用的 MySQL 数据源,不返回连接密码。',
|
|||
|
|
parameters: Params,
|
|||
|
|
async execute(_id, params) {
|
|||
|
|
return jsonResult({ sources: await deps.dataSourceService.listForAgent(readAgentId(params)) });
|
|||
|
|
},
|
|||
|
|
} as any;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export function createMysqlSchemaTool(deps: {
|
|||
|
|
dataSourceService: NetaClawDataSourceService;
|
|||
|
|
mysqlIntrospection: MysqlIntrospectionService;
|
|||
|
|
}): AgentToolWithMeta<any, unknown> {
|
|||
|
|
const Params = Type.Intersect([SourceParam, Type.Object({
|
|||
|
|
tables: Type.Optional(Type.Array(Type.String())),
|
|||
|
|
})]);
|
|||
|
|
return {
|
|||
|
|
name: 'mysql_schema',
|
|||
|
|
label: 'MySQL Schema',
|
|||
|
|
description: '读取授权 MySQL 表的字段、索引、主键和外键信息。',
|
|||
|
|
parameters: Params,
|
|||
|
|
async execute(_id, params) {
|
|||
|
|
const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
|
|||
|
|
return jsonResult(await deps.mysqlIntrospection.listSchema(source, { tables: params.tables }));
|
|||
|
|
},
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export function createMysqlTableSampleTool(deps: {
|
|||
|
|
dataSourceService: NetaClawDataSourceService;
|
|||
|
|
mysqlIntrospection: MysqlIntrospectionService;
|
|||
|
|
}): AgentToolWithMeta<any, unknown> {
|
|||
|
|
const Params = Type.Intersect([SourceParam, Type.Object({
|
|||
|
|
table: Type.String(),
|
|||
|
|
columns: Type.Optional(Type.Array(Type.String())),
|
|||
|
|
limit: Type.Optional(Type.Number()),
|
|||
|
|
})]);
|
|||
|
|
return {
|
|||
|
|
name: 'mysql_table_sample',
|
|||
|
|
label: 'MySQL 表样例',
|
|||
|
|
description: '读取授权表的少量样例值,用于判断字段含义和 JOIN key。',
|
|||
|
|
parameters: Params,
|
|||
|
|
async execute(_id, params) {
|
|||
|
|
const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
|
|||
|
|
return jsonResult(await deps.mysqlIntrospection.sampleTable(source, {
|
|||
|
|
table: params.table,
|
|||
|
|
columns: params.columns,
|
|||
|
|
limit: params.limit,
|
|||
|
|
}));
|
|||
|
|
},
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export function createMysqlQueryTool(deps: {
|
|||
|
|
dataSourceService: NetaClawDataSourceService;
|
|||
|
|
mysqlQuery: MysqlQueryService;
|
|||
|
|
}): AgentToolWithMeta<any, unknown> {
|
|||
|
|
const Params = Type.Intersect([SourceParam, Type.Object({
|
|||
|
|
sql: Type.String(),
|
|||
|
|
})]);
|
|||
|
|
return {
|
|||
|
|
name: 'mysql_query',
|
|||
|
|
label: 'MySQL 查询',
|
|||
|
|
description: '执行授权数据源上的只读 SELECT SQL。支持授权表范围内的跨表 JOIN。',
|
|||
|
|
parameters: Params,
|
|||
|
|
async execute(id, params) {
|
|||
|
|
const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
|
|||
|
|
return jsonResult(await deps.mysqlQuery.executeReadOnly({
|
|||
|
|
source,
|
|||
|
|
sql: params.sql,
|
|||
|
|
agentId: readAgentId(params),
|
|||
|
|
userId: readUserId(params),
|
|||
|
|
toolCallId: id,
|
|||
|
|
}));
|
|||
|
|
},
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for (const name of ['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']) {
|
|||
|
|
registerSchema({
|
|||
|
|
name,
|
|||
|
|
toolset: TOOLSET_MYSQL,
|
|||
|
|
description: name,
|
|||
|
|
visibility: 'tool',
|
|||
|
|
capability: 'text',
|
|||
|
|
isCore: false,
|
|||
|
|
canDisable: true,
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 4: Wire catalog, manifest, resolver**
|
|||
|
|
|
|||
|
|
Modify `packages/backend/src/modules/netaclaw/tools/catalog.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
export const TOOLSET_MYSQL = 'mysql' as const;
|
|||
|
|
import './builtin/mysql.js';
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Modify `packages/backend/src/modules/netaclaw/tools/manifest.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
const MAIN_PROCESS_PROXY_TOOLS = new Set([
|
|||
|
|
'write_file',
|
|||
|
|
'edit',
|
|||
|
|
'patch',
|
|||
|
|
'read_skill',
|
|||
|
|
'read_skill_file',
|
|||
|
|
'skill_manage',
|
|||
|
|
'memory_save',
|
|||
|
|
'memory_recall',
|
|||
|
|
'mysql_list_sources',
|
|||
|
|
'mysql_schema',
|
|||
|
|
'mysql_table_sample',
|
|||
|
|
'mysql_query',
|
|||
|
|
]);
|
|||
|
|
|
|||
|
|
export function toToolKind(name: string): SubagentToolManifestItem['kind'] {
|
|||
|
|
if (name.startsWith('mysql_')) return 'custom';
|
|||
|
|
if (name.startsWith('memory_')) return 'memory';
|
|||
|
|
if (name.startsWith('read_skill') || name === 'skill_manage') return 'skill';
|
|||
|
|
if (name === 'delegate_task' || name === 'delegate_parallel') return 'delegation';
|
|||
|
|
if (name === 'escalate') return 'admin';
|
|||
|
|
if (
|
|||
|
|
name === 'bash'
|
|||
|
|
|| name === 'read_file'
|
|||
|
|
|| name === 'write_file'
|
|||
|
|
|| name === 'list_dir'
|
|||
|
|
|| name === 'find_files'
|
|||
|
|
|| name === 'grep'
|
|||
|
|
|| name === 'edit'
|
|||
|
|
|| name === 'patch'
|
|||
|
|
|| name === 'clarify'
|
|||
|
|
) {
|
|||
|
|
return 'builtin';
|
|||
|
|
}
|
|||
|
|
return 'custom';
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Modify `packages/backend/src/modules/netaclaw/service/tool_resolver.ts`:
|
|||
|
|
|
|||
|
|
- Import services and factories:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { NetaClawDataSourceService } from './data_source.js';
|
|||
|
|
import { MysqlIntrospectionService } from './mysql_schema.js';
|
|||
|
|
import { MysqlQueryService } from './mysql_query.js';
|
|||
|
|
import {
|
|||
|
|
createMysqlListSourcesTool,
|
|||
|
|
createMysqlQueryTool,
|
|||
|
|
createMysqlSchemaTool,
|
|||
|
|
createMysqlTableSampleTool,
|
|||
|
|
} from '../tools/builtin/mysql.js';
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- Add injected services:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
@Inject()
|
|||
|
|
dataSourceService: NetaClawDataSourceService;
|
|||
|
|
|
|||
|
|
@Inject()
|
|||
|
|
mysqlIntrospection: MysqlIntrospectionService;
|
|||
|
|
|
|||
|
|
@Inject()
|
|||
|
|
mysqlQuery: MysqlQueryService;
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- Add to `getBaseToolMap()`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
['mysql_list_sources', createMysqlListSourcesTool({ dataSourceService: this.dataSourceService })],
|
|||
|
|
['mysql_schema', createMysqlSchemaTool({ dataSourceService: this.dataSourceService, mysqlIntrospection: this.mysqlIntrospection })],
|
|||
|
|
['mysql_table_sample', createMysqlTableSampleTool({ dataSourceService: this.dataSourceService, mysqlIntrospection: this.mysqlIntrospection })],
|
|||
|
|
['mysql_query', createMysqlQueryTool({ dataSourceService: this.dataSourceService, mysqlQuery: this.mysqlQuery })],
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 5: Run resolver tests**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 6: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts packages/backend/src/modules/netaclaw/tools/catalog.ts packages/backend/src/modules/netaclaw/tools/manifest.ts packages/backend/src/modules/netaclaw/service/tool_resolver.ts packages/backend/test/tool_resolver.test.ts
|
|||
|
|
git commit -m "feat: register mysql agent tools"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Task 6: Admin Controller And Prompt Skill
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
|
|||
|
|
- Create: `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts`
|
|||
|
|
- Create: `packages/backend/skills/data-analyst-mysql/SKILL.md`
|
|||
|
|
|
|||
|
|
- [ ] **Step 1: Implement admin controller**
|
|||
|
|
|
|||
|
|
Create `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts`:
|
|||
|
|
|
|||
|
|
```ts
|
|||
|
|
import { Body, Controller, Get, Inject, Post, Provide, Query } from '@midwayjs/core';
|
|||
|
|
import { NetaClawDataSourceService } from '../../service/data_source.js';
|
|||
|
|
|
|||
|
|
@Provide()
|
|||
|
|
@Controller('/admin/netaclaw/data-source')
|
|||
|
|
export class NetaClawDataSourceAdminController {
|
|||
|
|
@Inject()
|
|||
|
|
dataSourceService: NetaClawDataSourceService;
|
|||
|
|
|
|||
|
|
@Get('/list')
|
|||
|
|
async list(@Query('agentId') agentId?: number) {
|
|||
|
|
if (agentId) {
|
|||
|
|
return { code: 1000, data: await this.dataSourceService.listForAgent(Number(agentId)) };
|
|||
|
|
}
|
|||
|
|
return { code: 1000, data: await this.dataSourceService.listAdminSafe() };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@Post('/save')
|
|||
|
|
async save(@Body() body: any) {
|
|||
|
|
return { code: 1000, data: await this.dataSourceService.saveConfig(body) };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@Post('/delete')
|
|||
|
|
async delete(@Body() body: { id: number }) {
|
|||
|
|
await this.dataSourceService.delete(Number(body.id));
|
|||
|
|
return { code: 1000, message: 'success' };
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@Post('/test')
|
|||
|
|
async test(@Body() body: any) {
|
|||
|
|
const result = await this.dataSourceService.testConnection(body);
|
|||
|
|
return result.ok
|
|||
|
|
? { code: 1000, data: result }
|
|||
|
|
: { code: 1001, message: result.error || 'connection_failed', data: result };
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 2: Add prompt skill**
|
|||
|
|
|
|||
|
|
Create `packages/backend/skills/data-analyst-mysql/SKILL.md`:
|
|||
|
|
|
|||
|
|
```md
|
|||
|
|
---
|
|||
|
|
name: data-analyst-mysql
|
|||
|
|
description: 使用 MySQL 工具进行只读智能问数,支持 schema 分析、跨表 JOIN、口径澄清和 SQL 结果解释。
|
|||
|
|
version: 1.0.0
|
|||
|
|
metadata:
|
|||
|
|
skillType: llm
|
|||
|
|
tags: [mysql, data-analysis, question-answering]
|
|||
|
|
conditions:
|
|||
|
|
requires_tools: ["mysql_list_sources", "mysql_schema", "mysql_query"]
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# MySQL 智能问数
|
|||
|
|
|
|||
|
|
你负责使用 MySQL 工具回答业务数据问题。所有数据库访问必须通过 `mysql_*` 工具完成。
|
|||
|
|
|
|||
|
|
## 工作流程
|
|||
|
|
|
|||
|
|
1. 先调用 `mysql_list_sources` 查看当前 Agent 可用的数据源。
|
|||
|
|
2. 根据用户问题选择数据源;如果无法判断,先澄清。
|
|||
|
|
3. 调用 `mysql_schema` 读取相关表结构、字段、主键、索引和外键。
|
|||
|
|
4. 跨表 JOIN 时优先使用外键;没有外键时,根据字段名、类型、索引和 `mysql_table_sample` 的样例值推断关联。
|
|||
|
|
5. 业务口径不明确时必须先澄清,不要硬猜。
|
|||
|
|
6. 只调用 `mysql_query` 执行只读 SELECT。
|
|||
|
|
|
|||
|
|
## JOIN 规则
|
|||
|
|
|
|||
|
|
- JOIN 必须有明确的 `ON` 或 `USING` 条件。
|
|||
|
|
- 不确定关联字段时,先解释拟采用的关联并询问用户。
|
|||
|
|
- 若关联来自推断,最终回答必须说明“该关联为推断”。
|
|||
|
|
- 查询超时或被拒绝时,缩小时间范围、减少表数量或向用户澄清。
|
|||
|
|
|
|||
|
|
## 回答格式
|
|||
|
|
|
|||
|
|
回答必须包含:
|
|||
|
|
|
|||
|
|
- 结论。
|
|||
|
|
- SQL。
|
|||
|
|
- 口径说明,特别是时间范围、筛选条件、JOIN 字段。
|
|||
|
|
- 限制说明,例如 LIMIT、样例、脱敏列、推断关联或字段缺失。
|
|||
|
|
|
|||
|
|
不要暴露数据库 host、账号、密码、连接串或未授权表信息。
|
|||
|
|
不要承诺“全量统计”,除非 SQL 口径和过滤范围明确。
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- [ ] **Step 3: Run build or focused tests**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts test/skill_env_schema.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 4: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add packages/backend/src/modules/netaclaw/controller/admin/data_source.ts packages/backend/skills/data-analyst-mysql/SKILL.md
|
|||
|
|
git commit -m "feat: add mysql data source admin api and analyst skill"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Task 7: Verification And Documentation Sync
|
|||
|
|
|
|||
|
|
**Files:**
|
|||
|
|
|
|||
|
|
- Modify: `docs/code-wiki/entities/tool-system.md`
|
|||
|
|
- Modify: `docs/code-wiki/entities/skill-system.md`
|
|||
|
|
- Modify: `docs/code-wiki/entities/netaclaw-module.md`
|
|||
|
|
- Modify: `docs/code-wiki/comparisons/database-entity-overview.md`
|
|||
|
|
|
|||
|
|
- [ ] **Step 1: Update code wiki**
|
|||
|
|
|
|||
|
|
Add concise references:
|
|||
|
|
|
|||
|
|
- `tool-system.md`: add `mysql_list_sources`, `mysql_schema`, `mysql_table_sample`, `mysql_query` under a `MySQL 问数` toolset.
|
|||
|
|
- `skill-system.md`: add `data-analyst-mysql` as a prompt skill.
|
|||
|
|
- `netaclaw-module.md`: increment table count and list `netaclaw_data_source`, `netaclaw_data_source_query_audit`.
|
|||
|
|
- `database-entity-overview.md`: add field summary for both new tables.
|
|||
|
|
|
|||
|
|
- [ ] **Step 2: Run all focused backend tests**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/mysql_sql_guard.test.ts test/mysql_schema_mapper.test.ts test/mysql_query_service.test.ts test/mysql_pool_manager.test.ts test/tool_resolver.test.ts test/entity_exports.test.ts
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: PASS.
|
|||
|
|
|
|||
|
|
- [ ] **Step 3: Run backend build**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
pnpm --filter @neta/backend build
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: exit code 0.
|
|||
|
|
|
|||
|
|
- [ ] **Step 4: Inspect final diff**
|
|||
|
|
|
|||
|
|
Run:
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git status --short
|
|||
|
|
git diff --stat
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Expected: only MySQL question answering implementation files and docs are changed.
|
|||
|
|
|
|||
|
|
- [ ] **Step 5: Commit**
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
git add docs/code-wiki packages/backend
|
|||
|
|
git commit -m "docs: update code wiki for mysql question answering"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Final Verification Checklist
|
|||
|
|
|
|||
|
|
- [ ] SQL guard rejects `SHOW`, `DESCRIBE`, `EXPLAIN`, comments, semicolons, DML, DDL, `UNION`, CTE, user variables, dangerous functions, blocked tables, unallowed tables, missing JOIN condition, too many JOIN tables.
|
|||
|
|
- [ ] Data source password is AES-GCM encrypted; admin APIs never return password ciphertext/plaintext, and `mysql_list_sources` never returns host, username, password, connection string, or permission details.
|
|||
|
|
- [ ] `mysql_schema` returns only authorized table metadata and marks masked columns.
|
|||
|
|
- [ ] `mysql_table_sample` validates table and column identifiers against schema plus allowlist/blocklist, and rejects masked columns.
|
|||
|
|
- [ ] `mysql_query` rejects masked column references, explicit LIMIT above `maxRows`, missing per-JOIN `ON/USING`, blocked tables, and unallowed tables before querying MySQL.
|
|||
|
|
- [ ] `mysql_query` writes audit for success, rejected, and failed SQL.
|
|||
|
|
- [ ] MySQL tools are visible only when selected by `mysql` toolset or explicit tool config.
|
|||
|
|
- [ ] Subagent worker route for MySQL tools is `main-process-proxy`, not `worker-local`.
|
|||
|
|
- [ ] `data-analyst-mysql` skill is discoverable and condition-gated by MySQL tools.
|
|||
|
|
- [ ] Focused tests pass.
|
|||
|
|
- [ ] Backend build passes.
|