GPU_GUARD_MONOREPO/docs/superpowers/plans/2026-05-15-mysql-question-answering.md
2026-05-20 21:39:12 +08:00

2042 lines
69 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# MySQL Question Answering Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Build a MySQL-only, read-only intelligent data questioning backend path for Neta Agents, with safe data-source configuration, schema/sample/query tools, cross-table JOIN support, audit, and a prompt skill.
**Architecture:** Add a `mysql` toolset backed by `netaclaw_data_source`, `SecretCryptoService`, `MysqlPoolManager`, schema/query services, and guarded Tool implementations. Security-critical rules live in services and tools; `data-analyst-mysql` only guides Agent behavior.
**Tech Stack:** Midway.js, TypeORM, mysql2/promise, TypeBox tool schemas, Jest, NetaClaw Tool Catalog/Resolver/Runtime Policy, prompt Skill.
---
## File Structure
Create:
- `packages/backend/src/modules/netaclaw/entity/data_source.ts` — MySQL data-source configuration entity.
- `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts` — query audit entity.
- `packages/backend/src/modules/netaclaw/service/secret_crypto.ts` — shared AES-256-GCM helper for new encrypted secrets.
- `packages/backend/src/modules/netaclaw/service/data_source.ts` — admin/runtime data-source lookup, authorization, save, and test connection service.
- `packages/backend/src/modules/netaclaw/service/mysql_pool.ts` — cached mysql2 pool manager.
- `packages/backend/src/modules/netaclaw/service/mysql_schema.ts` — information_schema mapping and schema/sample helpers.
- `packages/backend/src/modules/netaclaw/service/mysql_query.ts` — read-only SQL execution, masked-column rejection, row limits, audit.
- `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts``mysql_list_sources`, `mysql_schema`, `mysql_table_sample`, `mysql_query`.
- `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts` — admin APIs for data-source list/save/test/delete.
- `packages/backend/skills/data-analyst-mysql/SKILL.md` — prompt skill for MySQL data analysis.
- `packages/backend/test/secret_crypto.test.ts`
- `packages/backend/test/mysql_sql_guard.test.ts`
- `packages/backend/test/mysql_schema_mapper.test.ts`
- `packages/backend/test/mysql_query_service.test.ts`
- `packages/backend/test/mysql_pool_manager.test.ts`
- `packages/backend/test/data_source_projection.test.ts`
Modify:
- `packages/backend/src/entities.ts` — add data source and query audit entity imports to the generated entity list. This file is marked generated, but existing tests assert it directly, so update it for this change.
- `packages/backend/src/modules/netaclaw/tools/catalog.ts` — add `TOOLSET_MYSQL` and import `./builtin/mysql.js`.
- `packages/backend/src/modules/netaclaw/tools/manifest.ts` — route `mysql_*` tools as main-process proxy, classify as custom/text/parallel.
- `packages/backend/src/modules/netaclaw/service/tool_resolver.ts` — inject MySQL tool dependencies into runtime tool map.
- `packages/backend/test/entity_exports.test.ts` — assert new entities are exported.
- `packages/backend/test/tool_resolver.test.ts` — assert mysql toolset resolution and subagent route.
Do not implement frontend UI in this pass. The admin controller is enough for Phase 1/2 backend configuration.
---
## Task 1: Entities And Secret Crypto
**Files:**
- Create: `packages/backend/src/modules/netaclaw/entity/data_source.ts`
- Create: `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts`
- Create: `packages/backend/src/modules/netaclaw/service/secret_crypto.ts`
- Modify: `packages/backend/src/entities.ts`
- Modify: `packages/backend/test/entity_exports.test.ts`
- Test: `packages/backend/test/secret_crypto.test.ts`
- Test: `packages/backend/test/entity_exports.test.ts`
- Test: `packages/backend/test/data_source_projection.test.ts`
- [ ] **Step 1: Write the failing secret crypto test**
Add `packages/backend/test/secret_crypto.test.ts`:
```ts
import { SecretCryptoService } from '../src/modules/netaclaw/service/secret_crypto.js';
describe('SecretCryptoService', () => {
const originalNetaSecret = process.env.NETA_SECRET_KEY;
const originalAppSecret = process.env.APP_SECRET;
beforeEach(() => {
process.env.NETA_SECRET_KEY = 'unit-test-secret-key';
delete process.env.APP_SECRET;
});
afterEach(() => {
if (originalNetaSecret === undefined) delete process.env.NETA_SECRET_KEY;
else process.env.NETA_SECRET_KEY = originalNetaSecret;
if (originalAppSecret === undefined) delete process.env.APP_SECRET;
else process.env.APP_SECRET = originalAppSecret;
});
it('encrypts text without embedding the plaintext', () => {
const service = new SecretCryptoService();
const encrypted = service.encryptText('mysql-password-123');
const envelope = JSON.parse(Buffer.from(encrypted, 'base64').toString('utf8'));
expect(encrypted).not.toContain('mysql-password-123');
expect(envelope).toEqual(expect.objectContaining({ v: 1, alg: 'aes-256-gcm' }));
expect(envelope.iv).toEqual(expect.any(String));
expect(envelope.tag).toEqual(expect.any(String));
expect(envelope.ct).toEqual(expect.any(String));
expect(service.decryptText(encrypted)).toBe('mysql-password-123');
});
it('encrypts and decrypts JSON values', () => {
const service = new SecretCryptoService();
const encrypted = service.encryptJson({ password: 'secret', token: 'abc' });
expect(encrypted).not.toContain('secret');
expect(service.decryptJson(encrypted)).toEqual({ password: 'secret', token: 'abc' });
});
it('rejects tampered ciphertext', () => {
const service = new SecretCryptoService();
const encrypted = service.encryptText('secret');
const tampered = encrypted.slice(0, -2) + 'xx';
expect(() => service.decryptText(tampered)).toThrow();
});
});
```
- [ ] **Step 2: Run test to verify RED**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts
```
Expected: FAIL because `secret_crypto.ts` does not exist.
- [ ] **Step 3: Implement `SecretCryptoService`**
Create `packages/backend/src/modules/netaclaw/service/secret_crypto.ts`:
```ts
import { Provide, Scope, ScopeEnum } from '@midwayjs/core';
import * as crypto from 'crypto';
@Provide()
@Scope(ScopeEnum.Singleton)
export class SecretCryptoService {
private readonly algorithm = 'aes-256-gcm' as const;
private readonly ivLength = 12;
private deriveKey(): Buffer {
const raw = process.env.NETA_SECRET_KEY || process.env.APP_SECRET;
if (!raw) {
throw new Error('NETA_SECRET_KEY or APP_SECRET is required for secret encryption');
}
return crypto.createHash('sha256').update(raw).digest();
}
encryptText(value: string): string {
const iv = crypto.randomBytes(this.ivLength);
const cipher = crypto.createCipheriv(this.algorithm, this.deriveKey(), iv);
const encrypted = Buffer.concat([cipher.update(value, 'utf8'), cipher.final()]);
const tag = cipher.getAuthTag();
return Buffer.from(JSON.stringify({
v: 1,
alg: this.algorithm,
iv: iv.toString('base64'),
tag: tag.toString('base64'),
ct: encrypted.toString('base64'),
}), 'utf8').toString('base64');
}
decryptText(ciphertext: string): string {
const envelope = JSON.parse(Buffer.from(ciphertext, 'base64').toString('utf8'));
if (!envelope || envelope.v !== 1 || envelope.alg !== this.algorithm) {
throw new Error('Unsupported ciphertext envelope');
}
const iv = Buffer.from(envelope.iv, 'base64');
const tag = Buffer.from(envelope.tag, 'base64');
const encrypted = Buffer.from(envelope.ct, 'base64');
const decipher = crypto.createDecipheriv(this.algorithm, this.deriveKey(), iv);
decipher.setAuthTag(tag);
return Buffer.concat([decipher.update(encrypted), decipher.final()]).toString('utf8');
}
encryptJson(value: Record<string, string>): string {
return this.encryptText(JSON.stringify(value));
}
decryptJson(ciphertext: string): Record<string, string> {
const parsed = JSON.parse(this.decryptText(ciphertext));
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
throw new Error('Invalid encrypted JSON payload');
}
return Object.fromEntries(Object.entries(parsed).map(([key, value]) => [key, String(value)]));
}
}
```
- [ ] **Step 4: Run test to verify GREEN**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts
```
Expected: PASS.
- [ ] **Step 5: Write failing entity export test**
Modify `packages/backend/test/entity_exports.test.ts`:
```ts
import { entities } from '../src/entities.js';
import { NetaClawAgentSessionEntity } from '../src/modules/netaclaw/entity/agent_session.js';
import { NetaClawAgentSessionEntryEntity } from '../src/modules/netaclaw/entity/agent_session_entry.js';
import { NetaClawDataSourceEntity } from '../src/modules/netaclaw/entity/data_source.js';
import { NetaClawDataSourceQueryAuditEntity } from '../src/modules/netaclaw/entity/data_source_query_audit.js';
describe('entities exports', () => {
it('includes SubagentSession entity', () => {
const entityNames = entities
.map(e => (typeof e === 'function' ? e.name : ''))
.filter(Boolean);
expect(entityNames.some(n => n.includes('SubagentSession'))).toBe(true);
});
it('includes session-tree MySQL entities', () => {
expect(entities).toContain(NetaClawAgentSessionEntity);
expect(entities).toContain(NetaClawAgentSessionEntryEntity);
});
it('includes MySQL question answering entities', () => {
expect(entities).toContain(NetaClawDataSourceEntity);
expect(entities).toContain(NetaClawDataSourceQueryAuditEntity);
});
});
```
- [ ] **Step 6: Run test to verify RED**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/entity_exports.test.ts
```
Expected: FAIL because entity files do not exist.
- [ ] **Step 7: Implement data source entities**
Create `packages/backend/src/modules/netaclaw/entity/data_source.ts`:
```ts
import { BaseEntity } from '../../base/entity/base.js';
import { Column, Entity, Index } from 'typeorm';
export interface NetaClawDataSourceExtra {
ssl?: boolean | Record<string, unknown>;
connectTimeout?: number;
queryTimeoutMs?: number;
maxRows?: number;
allowedTables?: string[];
blockedTables?: string[];
maskedColumns?: Record<string, 'hash' | 'partial' | 'redact'>;
schemaVisibility?: 'allowed-only' | 'all-names-only';
maxJoinTables?: number;
poolConnectionLimit?: number;
}
@Entity('netaclaw_data_source')
export class NetaClawDataSourceEntity extends BaseEntity {
@Index({ unique: true })
@Column({ comment: '数据源唯一名', length: 100 })
name: string;
@Column({ comment: '显示名称', length: 200, nullable: true })
label: string;
@Index()
@Column({ comment: '数据源类型', length: 20, default: 'mysql' })
type: string;
@Column({ comment: 'MySQL host', length: 255 })
host: string;
@Column({ comment: 'MySQL port', default: 3306 })
port: number;
@Column({ comment: '默认数据库', length: 128 })
database: string;
@Column({ comment: '连接用户名', length: 255 })
username: string;
@Column({ comment: '加密后的连接密码', type: 'text', nullable: true })
passwordEncrypted: string;
@Column({ comment: '是否只读', default: 1 })
readonly: number;
@Index()
@Column({ comment: '状态 0=禁用 1=启用', default: 1 })
status: number;
@Column({ comment: '授权 Agent ID 列表', type: 'json', nullable: true })
allowedAgentIds: number[];
@Column({ comment: '扩展配置', type: 'json', nullable: true })
extra: NetaClawDataSourceExtra;
}
```
Create `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts`:
```ts
import { BaseEntity } from '../../base/entity/base.js';
import { Column, Entity, Index } from 'typeorm';
@Entity('netaclaw_data_source_query_audit')
export class NetaClawDataSourceQueryAuditEntity extends BaseEntity {
@Index()
@Column({ comment: '数据源 ID' })
dataSourceId: number;
@Index()
@Column({ comment: 'Agent ID', nullable: true })
agentId: number;
@Index()
@Column({ comment: '用户 ID', nullable: true })
userId: number;
@Column({ comment: '工具调用 ID', length: 100, nullable: true })
toolCallId: string;
@Column({ comment: 'SQL SHA-256', length: 64 })
sqlHash: string;
@Column({ comment: 'SQL 摘要', type: 'text', nullable: true })
sqlPreview: string;
@Index()
@Column({ comment: '状态 success/rejected/failed', length: 20 })
status: string;
@Column({ comment: '拒绝原因', length: 100, nullable: true })
rejectReason: string;
@Column({ comment: '耗时 ms', nullable: true })
elapsedMs: number;
@Column({ comment: '返回行数', nullable: true })
rowCount: number;
@Column({ comment: '错误代码', length: 50, nullable: true })
errorCode: string;
}
```
Modify `packages/backend/src/entities.ts` by adding imports after existing netaclaw imports:
```ts
import * as entity51 from './modules/netaclaw/entity/data_source';
import * as entity52 from './modules/netaclaw/entity/data_source_query_audit';
```
and append:
```ts
...Object.values(entity51),
...Object.values(entity52),
```
- [ ] **Step 8: Run entity and crypto tests to verify GREEN**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/entity_exports.test.ts
```
Expected: PASS.
- [ ] **Step 9: Write failing data source projection test**
Add `packages/backend/test/data_source_projection.test.ts`:
```ts
import { NetaClawDataSourceService } from '../src/modules/netaclaw/service/data_source.js';
describe('NetaClawDataSourceService projections', () => {
const source = {
id: 1,
name: 'sales_prod',
label: '销售库',
type: 'mysql',
host: '127.0.0.1',
port: 3306,
database: 'sales',
username: 'root',
passwordEncrypted: 'ciphertext',
readonly: 1,
status: 1,
allowedAgentIds: [9],
extra: { allowedTables: ['orders'] },
} as any;
it('admin projection hides passwords but keeps connection metadata', () => {
const service = new NetaClawDataSourceService();
expect(service.toAdminSafe(source)).toEqual(expect.objectContaining({
name: 'sales_prod',
host: '127.0.0.1',
username: 'root',
hasPassword: true,
}));
expect(service.toAdminSafe(source)).not.toHaveProperty('passwordEncrypted');
});
it('agent projection hides host username password and permission details', () => {
const service = new NetaClawDataSourceService();
expect(service.toAgentSummary(source)).toEqual({
name: 'sales_prod',
label: '销售库',
database: 'sales',
status: 1,
});
});
});
```
- [ ] **Step 10: Run projection test to verify RED**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/data_source_projection.test.ts
```
Expected: FAIL because `toAdminSafe` and `toAgentSummary` do not exist.
- [ ] **Step 11: Implement safe projections**
Modify the service code in this task:
```ts
export type AdminSafeDataSource = Omit<NetaClawDataSourceEntity, 'passwordEncrypted'> & {
hasPassword: boolean;
};
export type AgentDataSourceSummary = Pick<NetaClawDataSourceEntity, 'name' | 'label' | 'database' | 'status'>;
```
Replace any existing unsafe projection helper with:
```ts
toAdminSafe(entity: NetaClawDataSourceEntity): AdminSafeDataSource {
const { passwordEncrypted, ...rest } = entity as any;
return { ...rest, hasPassword: !!passwordEncrypted };
}
toAgentSummary(entity: NetaClawDataSourceEntity): AgentDataSourceSummary {
return {
name: entity.name,
label: entity.label,
database: entity.database,
status: entity.status,
};
}
```
Update `listAdminSafe()` and `listForAgent()` to return the right projections:
```ts
async listAdminSafe(): Promise<AdminSafeDataSource[]> {
const rows = await this.repo.find({ order: { id: 'DESC' } as any });
return rows.map(row => this.toAdminSafe(row));
}
async listForAgent(agentId: number): Promise<AgentDataSourceSummary[]> {
const rows = await this.repo.find({ where: { type: 'mysql', status: 1 } as any, order: { id: 'DESC' } as any });
return rows
.filter(row => Array.isArray(row.allowedAgentIds) && row.allowedAgentIds.includes(agentId))
.map(row => this.toAgentSummary(row));
}
```
Update `saveConfig()` to return `toAdminSafe(saved)`.
- [ ] **Step 12: Run tests to verify GREEN**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/entity_exports.test.ts test/data_source_projection.test.ts
```
Expected: PASS.
- [ ] **Step 13: Commit**
```bash
git add packages/backend/src/modules/netaclaw/entity/data_source.ts packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts packages/backend/src/modules/netaclaw/service/secret_crypto.ts packages/backend/src/entities.ts packages/backend/test/secret_crypto.test.ts packages/backend/test/entity_exports.test.ts packages/backend/test/data_source_projection.test.ts
git commit -m "feat: add mysql data source entities and secret crypto"
```
---
## Task 2: SQL Guard
**Files:**
- Create: `packages/backend/src/modules/netaclaw/service/mysql_query.ts`
- Test: `packages/backend/test/mysql_sql_guard.test.ts`
- [ ] **Step 1: Write failing SQL guard tests**
Add `packages/backend/test/mysql_sql_guard.test.ts`:
```ts
import { validateMysqlReadOnlySql } from '../src/modules/netaclaw/service/mysql_query.js';
describe('validateMysqlReadOnlySql', () => {
const baseOptions = {
allowedTables: ['orders', 'customers'],
blockedTables: [],
maxJoinTables: 4,
maxRows: 200,
maskedColumns: {},
};
it('accepts a SELECT with an explicit JOIN condition', () => {
expect(validateMysqlReadOnlySql(
'SELECT c.name, COUNT(*) FROM customers c JOIN orders o ON o.customer_id = c.id GROUP BY c.name LIMIT 20',
baseOptions
)).toEqual(expect.objectContaining({
ok: true,
tables: expect.arrayContaining(['customers', 'orders']),
}));
});
it.each([
'SHOW TABLES',
'DESCRIBE orders',
'EXPLAIN SELECT * FROM orders',
'UPDATE orders SET amount = 1',
'DELETE FROM orders',
'DROP TABLE orders',
'SELECT * FROM orders; SELECT * FROM customers',
'SELECT * FROM orders -- comment',
'SELECT * FROM orders # comment',
'SELECT * FROM orders /* comment */',
'WITH recent AS (SELECT * FROM orders) SELECT * FROM recent',
'SELECT * FROM orders UNION SELECT * FROM customers',
'SELECT @x := id FROM orders',
'SELECT SLEEP(1)',
'SELECT * INTO OUTFILE "/tmp/a" FROM orders',
])('rejects unsafe SQL: %s', sql => {
const result = validateMysqlReadOnlySql(sql, baseOptions);
expect(result.ok).toBe(false);
});
it('rejects unallowed tables', () => {
const result = validateMysqlReadOnlySql('SELECT * FROM payments LIMIT 10', baseOptions);
expect(result).toEqual(expect.objectContaining({
ok: false,
reason: 'table_not_allowed',
}));
});
it('rejects blocked tables even when allowed', () => {
const result = validateMysqlReadOnlySql('SELECT * FROM orders LIMIT 10', {
allowedTables: ['orders'],
blockedTables: ['orders'],
maxJoinTables: 4,
});
expect(result).toEqual(expect.objectContaining({
ok: false,
reason: 'table_blocked',
}));
});
it('rejects explicit joins without ON or USING', () => {
const result = validateMysqlReadOnlySql(
'SELECT * FROM customers JOIN orders LIMIT 10',
baseOptions
);
expect(result).toEqual(expect.objectContaining({
ok: false,
reason: 'join_condition_required',
}));
});
it('rejects more joined tables than allowed', () => {
const result = validateMysqlReadOnlySql(
'SELECT * FROM a JOIN b ON b.a_id = a.id JOIN c ON c.b_id = b.id LIMIT 10',
{ allowedTables: ['a', 'b', 'c'], blockedTables: [], maxJoinTables: 2, maxRows: 200, maskedColumns: {} }
);
expect(result).toEqual(expect.objectContaining({
ok: false,
reason: 'too_many_join_tables',
}));
});
it('rejects a later JOIN that lacks its own ON or USING clause', () => {
const result = validateMysqlReadOnlySql(
'SELECT * FROM customers c JOIN orders o ON o.customer_id = c.id JOIN invoices i LIMIT 10',
{ allowedTables: ['customers', 'orders', 'invoices'], blockedTables: [], maxJoinTables: 4, maxRows: 200, maskedColumns: {} }
);
expect(result).toEqual(expect.objectContaining({
ok: false,
reason: 'join_condition_required',
}));
});
it('rejects explicit LIMIT values above maxRows', () => {
const result = validateMysqlReadOnlySql('SELECT * FROM orders LIMIT 10000', baseOptions);
expect(result).toEqual(expect.objectContaining({
ok: false,
reason: 'limit_exceeds_max_rows',
}));
});
it('rejects SQL that references masked columns', () => {
const result = validateMysqlReadOnlySql('SELECT email AS e FROM customers LIMIT 10', {
allowedTables: ['customers'],
blockedTables: [],
maxJoinTables: 4,
maxRows: 200,
maskedColumns: { 'customers.email': 'redact' },
});
expect(result).toEqual(expect.objectContaining({
ok: false,
reason: 'masked_column_denied',
}));
});
});
```
- [ ] **Step 2: Run test to verify RED**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_sql_guard.test.ts
```
Expected: FAIL because `mysql_query.ts` does not export the guard.
- [ ] **Step 3: Implement SQL guard**
Create the initial `packages/backend/src/modules/netaclaw/service/mysql_query.ts` with these exports:
```ts
export type MysqlSqlGuardOptions = {
allowedTables?: string[];
blockedTables?: string[];
maxJoinTables?: number;
maxRows?: number;
maskedColumns?: Record<string, 'hash' | 'partial' | 'redact'>;
};
export type MysqlSqlGuardResult =
| { ok: true; tables: string[]; limitedSql: string }
| { ok: false; reason: string; tables?: string[] };
const DENIED_PATTERNS: Array<[RegExp, string]> = [
[/;/, 'multiple_statements_denied'],
[/(--|#|\/\*|\*\/)/, 'comments_denied'],
[/^\s*(show|describe|desc|explain)\b/i, 'metadata_sql_denied'],
[/^\s*(insert|update|delete|replace|create|alter|drop|truncate|call|grant|revoke|start|begin|commit|rollback)\b/i, 'non_select_denied'],
[/\bunion\b/i, 'union_denied'],
[/^\s*with\b/i, 'cte_denied'],
[/@[a-zA-Z0-9_]+/, 'user_variable_denied'],
[/\btemporary\b/i, 'temporary_table_denied'],
[/\b(load_file|sleep|benchmark)\s*\(/i, 'dangerous_function_denied'],
[/\binto\s+(out|dump)?file\b/i, 'file_output_denied'],
[/\bfrom\s+[^,\s]+(?:\s+\w+)?\s*,\s*[^,\s]+/i, 'implicit_join_denied'],
];
function normalizeIdentifier(value: string): string {
return value.replace(/`/g, '').split('.').pop()!.toLowerCase();
}
function stripBackticks(value: string): string {
return value.replace(/`/g, '').toLowerCase();
}
export function extractMysqlTables(sql: string): string[] {
const tables = new Set<string>();
const tablePattern = /\b(?:from|join)\s+(`?[a-zA-Z0-9_]+`?(?:\.`?[a-zA-Z0-9_]+`?)?)/gi;
let match: RegExpExecArray | null;
while ((match = tablePattern.exec(sql))) {
tables.add(normalizeIdentifier(match[1]));
}
return [...tables];
}
export function validateMysqlReadOnlySql(sql: string, options: MysqlSqlGuardOptions = {}): MysqlSqlGuardResult {
const trimmed = String(sql || '').trim();
if (!trimmed) return { ok: false, reason: 'sql_empty' };
if (!/^\s*select\b/i.test(trimmed)) return { ok: false, reason: 'only_select_allowed' };
for (const [pattern, reason] of DENIED_PATTERNS) {
if (pattern.test(trimmed)) return { ok: false, reason };
}
if (!everyJoinHasCondition(trimmed)) {
return { ok: false, reason: 'join_condition_required' };
}
const tables = extractMysqlTables(trimmed);
const allowed = new Set((options.allowedTables || []).map(item => normalizeIdentifier(item)));
const blocked = new Set((options.blockedTables || []).map(item => normalizeIdentifier(item)));
for (const table of tables) {
if (blocked.has(table)) return { ok: false, reason: 'table_blocked', tables };
if (allowed.size === 0 || !allowed.has(table)) return { ok: false, reason: 'table_not_allowed', tables };
}
const maxJoinTables = Math.max(1, Math.min(options.maxJoinTables ?? 4, 6));
if (tables.length > maxJoinTables) {
return { ok: false, reason: 'too_many_join_tables', tables };
}
const maxRows = Math.max(1, Math.min(options.maxRows ?? 200, 1000));
const explicitLimit = /\blimit\s+(\d+)/i.exec(trimmed);
if (explicitLimit && Number(explicitLimit[1]) > maxRows) {
return { ok: false, reason: 'limit_exceeds_max_rows', tables };
}
const normalizedSql = stripBackticks(trimmed);
for (const key of Object.keys(options.maskedColumns || {})) {
const [table, column] = key.toLowerCase().split('.');
if (tables.includes(table) && new RegExp(`\\b${column}\\b`, 'i').test(normalizedSql)) {
return { ok: false, reason: 'masked_column_denied', tables };
}
}
const limitedSql = /\blimit\s+\d+/i.test(trimmed)
? trimmed
: `SELECT * FROM (${trimmed}) AS neta_limited_query LIMIT ?`;
return { ok: true, tables, limitedSql };
}
function everyJoinHasCondition(sql: string): boolean {
const joinPattern = /\bjoin\b/gi;
let match: RegExpExecArray | null;
while ((match = joinPattern.exec(sql))) {
const rest = sql.slice(match.index + match[0].length);
const boundary = /\b(join|where|group\s+by|order\s+by|limit)\b/i.exec(rest);
const segment = boundary ? rest.slice(0, boundary.index) : rest;
if (!/\b(on|using)\b/i.test(segment)) return false;
}
return true;
}
```
- [ ] **Step 4: Run test to verify GREEN**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_sql_guard.test.ts
```
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add packages/backend/src/modules/netaclaw/service/mysql_query.ts packages/backend/test/mysql_sql_guard.test.ts
git commit -m "feat: add mysql read only sql guard"
```
---
## Task 3: Data Source Service And Pool Manager
**Files:**
- Create: `packages/backend/src/modules/netaclaw/service/data_source.ts`
- Create: `packages/backend/src/modules/netaclaw/service/mysql_pool.ts`
- Modify: `packages/backend/src/modules/netaclaw/service/mysql_query.ts`
- Test: `packages/backend/test/mysql_pool_manager.test.ts`
- [ ] **Step 1: Write failing pool manager tests**
Add `packages/backend/test/mysql_pool_manager.test.ts`:
```ts
import { MysqlPoolManager } from '../src/modules/netaclaw/service/mysql_pool.js';
describe('MysqlPoolManager', () => {
it('reuses pools by data source id and clamps connectionLimit', async () => {
const created: any[] = [];
const createPool = jest.fn((config: any) => {
const pool = { config, end: jest.fn().mockResolvedValue(undefined) };
created.push(pool);
return pool;
});
const manager = new MysqlPoolManager(createPool as any);
const source: any = {
id: 7,
host: 'localhost',
port: 3306,
database: 'sales',
username: 'root',
passwordEncrypted: 'encrypted',
extra: { poolConnectionLimit: 20, connectTimeout: 1000 },
};
manager.secretCrypto = { decryptText: jest.fn().mockReturnValue('plain') } as any;
const first = await manager.getPool(source);
const second = await manager.getPool(source);
expect(first).toBe(second);
expect(createPool).toHaveBeenCalledTimes(1);
expect(created[0].config.connectionLimit).toBe(10);
expect(created[0].config.password).toBe('plain');
});
it('closes and forgets a cached pool', async () => {
const end = jest.fn().mockResolvedValue(undefined);
const createPool = jest.fn(() => ({ end }));
const manager = new MysqlPoolManager(createPool as any);
manager.secretCrypto = { decryptText: jest.fn().mockReturnValue('plain') } as any;
await manager.getPool({ id: 1, host: 'h', port: 3306, database: 'd', username: 'u', passwordEncrypted: 'p', extra: {} } as any);
await manager.closePool(1);
expect(end).toHaveBeenCalledTimes(1);
});
});
```
- [ ] **Step 2: Run test to verify RED**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts
```
Expected: FAIL because `mysql_pool.ts` does not exist.
- [ ] **Step 3: Implement pool manager**
Create `packages/backend/src/modules/netaclaw/service/mysql_pool.ts`:
```ts
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
import mysql from 'mysql2/promise';
import type { Pool } from 'mysql2/promise';
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
import { SecretCryptoService } from './secret_crypto.js';
type CreatePool = typeof mysql.createPool;
@Provide()
@Scope(ScopeEnum.Singleton)
export class MysqlPoolManager {
@Inject()
secretCrypto: SecretCryptoService;
private pools = new Map<number, Pool>();
private readonly createPool: CreatePool;
constructor(createPool: CreatePool = mysql.createPool) {
this.createPool = createPool;
}
async getPool(source: NetaClawDataSourceEntity): Promise<Pool> {
const existing = this.pools.get(source.id);
if (existing) return existing;
const limit = Math.max(1, Math.min(Number(source.extra?.poolConnectionLimit ?? 3), 10));
const pool = this.createPool({
host: source.host,
port: source.port || 3306,
database: source.database,
user: source.username,
password: source.passwordEncrypted ? this.secretCrypto.decryptText(source.passwordEncrypted) : undefined,
waitForConnections: true,
connectionLimit: limit,
connectTimeout: Math.max(1000, Number(source.extra?.connectTimeout ?? 10000)),
ssl: source.extra?.ssl as any,
});
this.pools.set(source.id, pool);
return pool;
}
async closePool(sourceId: number): Promise<void> {
const pool = this.pools.get(sourceId);
if (!pool) return;
this.pools.delete(sourceId);
await pool.end();
}
createTransientPool(source: NetaClawDataSourceEntity): Pool {
const limit = Math.max(1, Math.min(Number(source.extra?.poolConnectionLimit ?? 1), 3));
return this.createPool({
host: source.host,
port: source.port || 3306,
database: source.database,
user: source.username,
password: source.passwordEncrypted ? this.secretCrypto.decryptText(source.passwordEncrypted) : undefined,
waitForConnections: true,
connectionLimit: limit,
connectTimeout: Math.max(1000, Number(source.extra?.connectTimeout ?? 10000)),
ssl: source.extra?.ssl as any,
});
}
}
```
- [ ] **Step 4: Run pool tests to verify GREEN**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts
```
Expected: PASS.
- [ ] **Step 5: Implement `DataSourceService`**
Create `packages/backend/src/modules/netaclaw/service/data_source.ts`:
```ts
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
import { InjectEntityModel } from '@midwayjs/typeorm';
import { Repository } from 'typeorm';
import { NetaClawDataSourceEntity, NetaClawDataSourceExtra } from '../entity/data_source.js';
import { SecretCryptoService } from './secret_crypto.js';
import { MysqlPoolManager } from './mysql_pool.js';
export type AdminSafeDataSource = Omit<NetaClawDataSourceEntity, 'passwordEncrypted'> & {
hasPassword: boolean;
};
export type AgentDataSourceSummary = Pick<NetaClawDataSourceEntity, 'name' | 'label' | 'database' | 'status'>;
export type SaveDataSourceInput = Partial<NetaClawDataSourceEntity> & {
password?: string;
};
@Provide()
@Scope(ScopeEnum.Singleton)
export class NetaClawDataSourceService {
@InjectEntityModel(NetaClawDataSourceEntity)
repo: Repository<NetaClawDataSourceEntity>;
@Inject()
secretCrypto: SecretCryptoService;
@Inject()
mysqlPoolManager: MysqlPoolManager;
toAdminSafe(entity: NetaClawDataSourceEntity): AdminSafeDataSource {
const { passwordEncrypted, ...rest } = entity as any;
return { ...rest, hasPassword: !!passwordEncrypted };
}
toAgentSummary(entity: NetaClawDataSourceEntity): AgentDataSourceSummary {
return {
name: entity.name,
label: entity.label,
database: entity.database,
status: entity.status,
};
}
async listAdminSafe(): Promise<AdminSafeDataSource[]> {
const rows = await this.repo.find({ order: { id: 'DESC' } as any });
return rows.map(row => this.toAdminSafe(row));
}
async listForAgent(agentId: number): Promise<AgentDataSourceSummary[]> {
const rows = await this.repo.find({ where: { type: 'mysql', status: 1 } as any, order: { id: 'DESC' } as any });
return rows
.filter(row => Array.isArray(row.allowedAgentIds) && row.allowedAgentIds.includes(agentId))
.map(row => this.toAgentSummary(row));
}
async getAuthorizedSource(name: string, agentId: number): Promise<NetaClawDataSourceEntity> {
const source = await this.repo.findOne({ where: { name, type: 'mysql', status: 1 } as any });
if (!source) throw new Error('data_source_not_found');
if (!Array.isArray(source.allowedAgentIds) || !source.allowedAgentIds.includes(agentId)) {
throw new Error('data_source_not_authorized');
}
return source;
}
normalizeExtra(extra?: NetaClawDataSourceExtra | null): NetaClawDataSourceExtra {
return {
allowedTables: extra?.allowedTables ?? [],
blockedTables: extra?.blockedTables ?? [],
maskedColumns: extra?.maskedColumns ?? {},
schemaVisibility: extra?.schemaVisibility ?? 'allowed-only',
queryTimeoutMs: Math.max(1000, Math.min(Number(extra?.queryTimeoutMs ?? 10000), 30000)),
maxRows: Math.max(1, Math.min(Number(extra?.maxRows ?? 200), 1000)),
maxJoinTables: Math.max(1, Math.min(Number(extra?.maxJoinTables ?? 4), 6)),
poolConnectionLimit: Math.max(1, Math.min(Number(extra?.poolConnectionLimit ?? 3), 10)),
ssl: extra?.ssl,
connectTimeout: extra?.connectTimeout,
};
}
async saveConfig(input: SaveDataSourceInput): Promise<AdminSafeDataSource> {
const existing = input.id ? await this.repo.findOneBy({ id: input.id }) : null;
const entity = this.repo.create({
...(existing || {}),
...input,
type: 'mysql',
readonly: 1,
status: input.status ?? existing?.status ?? 1,
port: Number(input.port ?? existing?.port ?? 3306),
allowedAgentIds: Array.isArray(input.allowedAgentIds) ? input.allowedAgentIds : existing?.allowedAgentIds ?? [],
extra: this.normalizeExtra(input.extra),
});
if (input.password) {
entity.passwordEncrypted = this.secretCrypto.encryptText(input.password);
}
const saved = await this.repo.save(entity);
await this.mysqlPoolManager.closePool(saved.id);
return this.toAdminSafe(saved);
}
async delete(id: number): Promise<void> {
await this.repo.delete(id);
await this.mysqlPoolManager.closePool(id);
}
async testConnection(input: SaveDataSourceInput): Promise<{ ok: boolean; error?: string }> {
const existing = input.id ? await this.repo.findOneBy({ id: input.id }) : null;
const source = this.repo.create({
...(existing || {}),
...input,
id: input.id ?? -1,
type: 'mysql',
readonly: 1,
port: Number(input.port ?? 3306),
allowedAgentIds: Array.isArray(input.allowedAgentIds) ? input.allowedAgentIds : [],
extra: this.normalizeExtra(input.extra),
passwordEncrypted: input.password
? this.secretCrypto.encryptText(input.password)
: input.passwordEncrypted ?? existing?.passwordEncrypted,
} as any);
let pool: any;
try {
pool = this.mysqlPoolManager.createTransientPool(source);
await pool.query('SELECT 1 AS ok');
return { ok: true };
} catch (err: any) {
return { ok: false, error: sanitizeMysqlConnectionError(err) };
} finally {
if (pool) await pool.end();
}
}
}
function sanitizeMysqlConnectionError(err: any): string {
return err?.code || err?.sqlState || err?.errno ? `mysql_connection_failed:${err.code || err.sqlState || err.errno}` : 'mysql_connection_failed';
}
```
- [ ] **Step 6: Run focused tests**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts test/secret_crypto.test.ts
```
Expected: PASS.
- [ ] **Step 7: Commit**
```bash
git add packages/backend/src/modules/netaclaw/service/data_source.ts packages/backend/src/modules/netaclaw/service/mysql_pool.ts packages/backend/test/mysql_pool_manager.test.ts
git commit -m "feat: add mysql data source services"
```
---
## Task 4: Schema Mapping And Query Execution
**Files:**
- Create: `packages/backend/src/modules/netaclaw/service/mysql_schema.ts`
- Modify: `packages/backend/src/modules/netaclaw/service/mysql_query.ts`
- Test: `packages/backend/test/mysql_schema_mapper.test.ts`
- Test: `packages/backend/test/mysql_query_service.test.ts`
- [ ] **Step 1: Write failing schema mapper test**
Add `packages/backend/test/mysql_schema_mapper.test.ts`:
```ts
import { mapMysqlSchemaRows } from '../src/modules/netaclaw/service/mysql_schema.js';
describe('mapMysqlSchemaRows', () => {
it('maps columns, primary keys, indexes and foreign keys', () => {
const result = mapMysqlSchemaRows({
columns: [
{ tableName: 'orders', tableComment: '订单', columnName: 'id', columnType: 'int', isNullable: 'NO', columnKey: 'PRI', columnComment: 'ID' },
{ tableName: 'orders', tableComment: '订单', columnName: 'customer_id', columnType: 'int', isNullable: 'NO', columnKey: 'MUL', columnComment: '客户' },
],
indexes: [
{ tableName: 'orders', indexName: 'PRIMARY', columnName: 'id', nonUnique: 0, seqInIndex: 1 },
{ tableName: 'orders', indexName: 'idx_customer', columnName: 'customer_id', nonUnique: 1, seqInIndex: 1 },
],
foreignKeys: [
{ tableName: 'orders', columnName: 'customer_id', constraintName: 'fk_orders_customer', referencedTableName: 'customers', referencedColumnName: 'id' },
],
maskedColumns: { 'orders.customer_id': 'redact' },
});
expect(result.tables).toEqual([
expect.objectContaining({
name: 'orders',
comment: '订单',
primaryKey: ['id'],
columns: [
expect.objectContaining({ name: 'id', type: 'int', nullable: false, key: 'PRI', masked: false }),
expect.objectContaining({ name: 'customer_id', type: 'int', nullable: false, key: 'MUL', masked: true, maskMode: 'redact' }),
],
indexes: expect.arrayContaining([
expect.objectContaining({ name: 'idx_customer', columns: ['customer_id'], unique: false }),
]),
foreignKeys: [
expect.objectContaining({ column: 'customer_id', referencedTable: 'customers', referencedColumn: 'id' }),
],
}),
]);
});
});
```
- [ ] **Step 2: Run schema mapper test to verify RED**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts
```
Expected: FAIL because `mysql_schema.ts` does not exist.
- [ ] **Step 3: Implement schema mapper and service skeleton**
Create `packages/backend/src/modules/netaclaw/service/mysql_schema.ts` with:
```ts
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
import { MysqlPoolManager } from './mysql_pool.js';
type MaskMode = 'hash' | 'partial' | 'redact';
export interface MysqlSchemaRows {
columns: Array<{ tableName: string; tableComment?: string; columnName: string; columnType: string; isNullable: string; columnKey?: string; columnComment?: string }>;
indexes: Array<{ tableName: string; indexName: string; columnName: string; nonUnique: number; seqInIndex: number }>;
foreignKeys: Array<{ tableName: string; columnName: string; constraintName: string; referencedTableName: string; referencedColumnName: string }>;
maskedColumns?: Record<string, MaskMode>;
}
export function mapMysqlSchemaRows(rows: MysqlSchemaRows) {
const tableMap = new Map<string, any>();
const getTable = (name: string, comment?: string) => {
if (!tableMap.has(name)) {
tableMap.set(name, { name, comment: comment || '', columns: [], primaryKey: [], indexes: [], foreignKeys: [] });
}
return tableMap.get(name);
};
for (const column of rows.columns) {
const table = getTable(column.tableName, column.tableComment);
const maskKey = `${column.tableName}.${column.columnName}`;
const maskMode = rows.maskedColumns?.[maskKey];
if (column.columnKey === 'PRI') table.primaryKey.push(column.columnName);
table.columns.push({
name: column.columnName,
type: column.columnType,
nullable: column.isNullable === 'YES',
key: column.columnKey || '',
comment: column.columnComment || '',
masked: !!maskMode,
maskMode: maskMode || null,
});
}
const indexGroups = new Map<string, any>();
for (const index of rows.indexes) {
const key = `${index.tableName}.${index.indexName}`;
if (!indexGroups.has(key)) {
indexGroups.set(key, { tableName: index.tableName, name: index.indexName, unique: index.nonUnique === 0, columns: [] as string[] });
}
indexGroups.get(key).columns[index.seqInIndex - 1] = index.columnName;
}
for (const index of indexGroups.values()) {
getTable(index.tableName).indexes.push({ name: index.name, unique: index.unique, columns: index.columns.filter(Boolean) });
}
for (const fk of rows.foreignKeys) {
getTable(fk.tableName).foreignKeys.push({
name: fk.constraintName,
column: fk.columnName,
referencedTable: fk.referencedTableName,
referencedColumn: fk.referencedColumnName,
});
}
return { tables: [...tableMap.values()] };
}
@Provide()
@Scope(ScopeEnum.Singleton)
export class MysqlIntrospectionService {
@Inject()
poolManager: MysqlPoolManager;
async listSchema(source: NetaClawDataSourceEntity, options: { tables?: string[] } = {}) {
const allowedTables = source.extra?.allowedTables || [];
const requested = options.tables?.length ? options.tables : allowedTables;
const visibleTables = requested.filter(table => allowedTables.includes(table) && !(source.extra?.blockedTables || []).includes(table));
if (visibleTables.length === 0) return { tables: [] };
const pool = await this.poolManager.getPool(source);
const placeholders = visibleTables.map(() => '?').join(',');
const [columns] = await pool.query(
`SELECT TABLE_NAME AS tableName, '' AS tableComment, COLUMN_NAME AS columnName, COLUMN_TYPE AS columnType, IS_NULLABLE AS isNullable, COLUMN_KEY AS columnKey, COLUMN_COMMENT AS columnComment
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders})
ORDER BY TABLE_NAME, ORDINAL_POSITION`,
[source.database, ...visibleTables]
);
const [indexes] = await pool.query(
`SELECT TABLE_NAME AS tableName, INDEX_NAME AS indexName, COLUMN_NAME AS columnName, NON_UNIQUE AS nonUnique, SEQ_IN_INDEX AS seqInIndex
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders})
ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX`,
[source.database, ...visibleTables]
);
const [foreignKeys] = await pool.query(
`SELECT TABLE_NAME AS tableName, COLUMN_NAME AS columnName, CONSTRAINT_NAME AS constraintName, REFERENCED_TABLE_NAME AS referencedTableName, REFERENCED_COLUMN_NAME AS referencedColumnName
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders}) AND REFERENCED_TABLE_NAME IS NOT NULL`,
[source.database, ...visibleTables]
);
return mapMysqlSchemaRows({
columns: columns as MysqlSchemaRows['columns'],
indexes: indexes as MysqlSchemaRows['indexes'],
foreignKeys: foreignKeys as MysqlSchemaRows['foreignKeys'],
maskedColumns: source.extra?.maskedColumns,
});
}
async sampleTable(source: NetaClawDataSourceEntity, options: { table: string; columns?: string[]; limit?: number }) {
const allowedTables = source.extra?.allowedTables || [];
const blockedTables = source.extra?.blockedTables || [];
if (!allowedTables.includes(options.table) || blockedTables.includes(options.table)) {
throw new Error('table_not_allowed');
}
const schema = await this.listSchema(source, { tables: [options.table] });
const table = schema.tables[0];
if (!table) throw new Error('table_not_found');
const visibleColumns = new Set(table.columns.map((column: any) => column.name));
const requestedColumns = options.columns?.length ? options.columns : table.columns.map((column: any) => column.name);
for (const column of requestedColumns) {
if (!/^[A-Za-z0-9_]+$/.test(column) || !visibleColumns.has(column)) {
throw new Error('column_not_allowed');
}
if (source.extra?.maskedColumns?.[`${options.table}.${column}`]) {
throw new Error('masked_column_denied');
}
}
const safeTable = `\`${options.table.replace(/`/g, '')}\``;
const safeColumns = requestedColumns.map(column => `\`${column.replace(/`/g, '')}\``).join(', ');
const limit = Math.max(1, Math.min(Number(options.limit ?? 5), 20));
const pool = await this.poolManager.getPool(source);
const [rows, fields] = await pool.query(`SELECT ${safeColumns} FROM ${safeTable} LIMIT ?`, [limit]);
return {
columns: Array.isArray(fields) ? fields.map((field: any) => field.name) : requestedColumns,
rows: Array.isArray(rows) ? rows : [],
rowCount: Array.isArray(rows) ? rows.length : 0,
truncated: false,
};
}
}
```
- [ ] **Step 4: Run schema mapper test to verify GREEN**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts
```
Expected: PASS.
- [ ] **Step 5: Write failing query service test**
Add `packages/backend/test/mysql_query_service.test.ts`:
```ts
import { MysqlQueryService } from '../src/modules/netaclaw/service/mysql_query.js';
describe('MysqlQueryService', () => {
it('executes a guarded query and writes audit', async () => {
const query = jest.fn().mockResolvedValue([
[
{ id: 1, total: 99 },
],
[{ name: 'id' }, { name: 'total' }],
]);
const service = new MysqlQueryService();
service.poolManager = { getPool: jest.fn().mockResolvedValue({ query }) } as any;
service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;
const result = await service.executeReadOnly({
source: {
id: 1,
database: 'sales',
extra: {
allowedTables: ['orders'],
blockedTables: [],
maskedColumns: {},
maxRows: 200,
maxJoinTables: 4,
queryTimeoutMs: 10000,
},
} as any,
sql: 'SELECT id, total FROM orders LIMIT 10',
agentId: 3,
userId: 5,
toolCallId: 'tool-1',
});
expect(result.rows).toEqual([{ id: 1, total: 99 }]);
expect(result.rowCount).toBe(1);
expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
dataSourceId: 1,
agentId: 3,
userId: 5,
toolCallId: 'tool-1',
status: 'success',
rowCount: 1,
}));
});
it('rejects masked column references before querying mysql', async () => {
const service = new MysqlQueryService();
service.poolManager = { getPool: jest.fn() } as any;
service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;
await expect(service.executeReadOnly({
source: {
id: 1,
extra: {
allowedTables: ['orders'],
blockedTables: [],
maskedColumns: { 'orders.email': 'redact' },
maxRows: 200,
maxJoinTables: 4,
},
} as any,
sql: 'SELECT email AS e FROM orders LIMIT 10',
agentId: 3,
userId: 5,
toolCallId: 'tool-masked',
})).rejects.toThrow('mysql_sql_rejected');
expect(service.poolManager.getPool).not.toHaveBeenCalled();
expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
status: 'rejected',
rejectReason: 'masked_column_denied',
}));
});
it('audits rejected SQL', async () => {
const service = new MysqlQueryService();
service.poolManager = { getPool: jest.fn() } as any;
service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;
await expect(service.executeReadOnly({
source: { id: 1, extra: { allowedTables: ['orders'], blockedTables: [] } } as any,
sql: 'DROP TABLE orders',
agentId: 3,
userId: 5,
toolCallId: 'tool-2',
})).rejects.toThrow('mysql_sql_rejected');
expect(service.poolManager.getPool).not.toHaveBeenCalled();
expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
status: 'rejected',
}));
});
});
```
- [ ] **Step 6: Run query service test to verify RED**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_query_service.test.ts
```
Expected: FAIL because `MysqlQueryService` is not implemented.
- [ ] **Step 7: Implement query service**
Extend `packages/backend/src/modules/netaclaw/service/mysql_query.ts` with:
```ts
import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
import { InjectEntityModel } from '@midwayjs/typeorm';
import { Repository } from 'typeorm';
import * as crypto from 'crypto';
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
import { NetaClawDataSourceQueryAuditEntity } from '../entity/data_source_query_audit.js';
import { MysqlPoolManager } from './mysql_pool.js';
export interface ExecuteReadOnlyParams {
source: NetaClawDataSourceEntity;
sql: string;
agentId?: number;
userId?: number;
toolCallId?: string;
}
function hashSql(sql: string): string {
return crypto.createHash('sha256').update(sql).digest('hex');
}
function sanitizeMysqlError(err: any): string {
const code = err?.code || err?.sqlState || err?.errno;
return code ? `mysql_query_failed:${code}` : 'mysql_query_failed';
}
@Provide()
@Scope(ScopeEnum.Singleton)
export class MysqlQueryService {
@Inject()
poolManager: MysqlPoolManager;
@InjectEntityModel(NetaClawDataSourceQueryAuditEntity)
auditRepo: Repository<NetaClawDataSourceQueryAuditEntity>;
async executeReadOnly(params: ExecuteReadOnlyParams) {
const start = Date.now();
const extra = params.source.extra || {};
const guard = validateMysqlReadOnlySql(params.sql, {
allowedTables: extra.allowedTables || [],
blockedTables: extra.blockedTables || [],
maxJoinTables: extra.maxJoinTables || 4,
maxRows: extra.maxRows || 200,
maskedColumns: extra.maskedColumns || {},
});
if (!guard.ok) {
await this.writeAudit(params, 'rejected', start, 0, guard.reason);
throw new Error(`mysql_sql_rejected: ${guard.reason}`);
}
try {
const pool = await this.poolManager.getPool(params.source);
const maxRows = Math.max(1, Math.min(Number(extra.maxRows ?? 200), 1000));
const appendedLimit = guard.limitedSql.includes('?');
const sql = guard.limitedSql.includes('?') ? guard.limitedSql : guard.limitedSql;
const sqlParams = appendedLimit ? [maxRows + 1] : [];
const [rows, fields] = await pool.query({ sql, timeout: Math.max(1000, Math.min(Number(extra.queryTimeoutMs ?? 10000), 30000)) } as any, sqlParams);
const fetchedRows = Array.isArray(rows) ? rows as any[] : [];
const truncated = appendedLimit && fetchedRows.length > maxRows;
const resultRows = truncated ? fetchedRows.slice(0, Math.min(fetchedRows.length, maxRows)) : fetchedRows;
await this.writeAudit(params, 'success', start, resultRows.length, null);
return {
columns: Array.isArray(fields) ? fields.map((field: any) => field.name) : Object.keys(resultRows[0] || {}),
rows: resultRows,
rowCount: resultRows.length,
truncated,
elapsedMs: Date.now() - start,
sql: params.sql,
};
} catch (err: any) {
await this.writeAudit(params, 'failed', start, 0, null, err?.code || err?.errno || err?.sqlState);
throw new Error(sanitizeMysqlError(err));
}
}
private async writeAudit(
params: ExecuteReadOnlyParams,
status: 'success' | 'rejected' | 'failed',
start: number,
rowCount: number,
rejectReason?: string | null,
errorCode?: string | number | null
) {
await this.auditRepo.save({
dataSourceId: params.source.id,
agentId: params.agentId ?? null,
userId: params.userId ?? null,
toolCallId: params.toolCallId ?? null,
sqlHash: hashSql(params.sql),
sqlPreview: params.sql.slice(0, 1000),
status,
rejectReason: rejectReason ?? null,
elapsedMs: Date.now() - start,
rowCount,
errorCode: errorCode ? String(errorCode) : null,
} as any);
}
}
```
- [ ] **Step 8: Run schema/query tests**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts test/mysql_query_service.test.ts test/mysql_sql_guard.test.ts
```
Expected: PASS.
- [ ] **Step 9: Commit**
```bash
git add packages/backend/src/modules/netaclaw/service/mysql_schema.ts packages/backend/src/modules/netaclaw/service/mysql_query.ts packages/backend/test/mysql_schema_mapper.test.ts packages/backend/test/mysql_query_service.test.ts
git commit -m "feat: add mysql schema and query services"
```
---
## Task 5: MySQL Agent Tools And Resolver Wiring
**Files:**
- Create: `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts`
- Modify: `packages/backend/src/modules/netaclaw/tools/catalog.ts`
- Modify: `packages/backend/src/modules/netaclaw/tools/manifest.ts`
- Modify: `packages/backend/src/modules/netaclaw/service/tool_resolver.ts`
- Test: `packages/backend/test/tool_resolver.test.ts`
- [ ] **Step 1: Write failing resolver tests**
Append to `packages/backend/test/tool_resolver.test.ts`:
```ts
it('resolves mysql toolset tools when enabled for an agent', async () => {
service.toolRegistry = {
all: jest.fn().mockResolvedValue([
{ name: 'mysql_list_sources', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
{ name: 'mysql_schema', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
{ name: 'mysql_table_sample', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
{ name: 'mysql_query', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
]),
} as any;
service.skillLoader = {} as any;
service.dataSourceService = { listForAgent: jest.fn().mockResolvedValue([]) } as any;
service.mysqlIntrospection = {} as any;
service.mysqlQuery = {} as any;
const result = await service.resolve({
agent: { id: 9, toolsets: ['mysql'], tools: { disabled: [] } } as any,
modelCapability: 'text',
});
expect(result.toolNames).toEqual(['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']);
expect(result.tools.map(tool => tool.name)).toEqual(['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']);
});
it('routes mysql tools through main process proxy for subagent workers', async () => {
service.toolRegistry = {
all: jest.fn().mockResolvedValue([
{ name: 'mysql_query', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
]),
} as any;
service.skillLoader = {} as any;
service.dataSourceService = {} as any;
service.mysqlIntrospection = {} as any;
service.mysqlQuery = {} as any;
const result = await service.resolve({
agent: {
tools: { inheritCoreTools: false, enabled: ['mysql_query'], disabled: [] },
} as any,
modelCapability: 'text',
delegationRole: 'subagent',
});
expect(result.toolManifest).toEqual([
expect.objectContaining({
name: 'mysql_query',
supportedInWorker: false,
workerRoutingHint: 'main-process-proxy',
}),
]);
});
```
- [ ] **Step 2: Run resolver test to verify RED**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts
```
Expected: FAIL because mysql tools are not registered or injected.
- [ ] **Step 3: Implement MySQL tools**
Create `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts`:
```ts
import { Type } from '@sinclair/typebox';
import { AgentToolWithMeta, jsonResult } from '../common.js';
import { registerSchema } from '../catalog.js';
import { NetaClawDataSourceService } from '../../service/data_source.js';
import { MysqlIntrospectionService } from '../../service/mysql_schema.js';
import { MysqlQueryService } from '../../service/mysql_query.js';
export const TOOLSET_MYSQL = 'mysql' as const;
const Runtime = Type.Optional(Type.Any());
const SourceParam = Type.Object({
source: Type.String({ description: '数据源名称' }),
_netaRuntime: Runtime,
});
function readAgentId(params: any): number {
const id = params?._netaRuntime?.currentAgent?.id;
if (!id) throw new Error('current_agent_id_missing');
return Number(id);
}
function readUserId(params: any): number | undefined {
const id = params?._netaRuntime?.userId;
return id === undefined || id === null ? undefined : Number(id);
}
export function createMysqlListSourcesTool(deps: {
dataSourceService: NetaClawDataSourceService;
}): AgentToolWithMeta<typeof Type.Object<any>, unknown> {
const Params = Type.Object({ _netaRuntime: Runtime });
return {
name: 'mysql_list_sources',
label: 'MySQL 数据源列表',
description: '列出当前 Agent 授权可用的 MySQL 数据源,不返回连接密码。',
parameters: Params,
async execute(_id, params) {
return jsonResult({ sources: await deps.dataSourceService.listForAgent(readAgentId(params)) });
},
} as any;
}
export function createMysqlSchemaTool(deps: {
dataSourceService: NetaClawDataSourceService;
mysqlIntrospection: MysqlIntrospectionService;
}): AgentToolWithMeta<any, unknown> {
const Params = Type.Intersect([SourceParam, Type.Object({
tables: Type.Optional(Type.Array(Type.String())),
})]);
return {
name: 'mysql_schema',
label: 'MySQL Schema',
description: '读取授权 MySQL 表的字段、索引、主键和外键信息。',
parameters: Params,
async execute(_id, params) {
const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
return jsonResult(await deps.mysqlIntrospection.listSchema(source, { tables: params.tables }));
},
};
}
export function createMysqlTableSampleTool(deps: {
dataSourceService: NetaClawDataSourceService;
mysqlIntrospection: MysqlIntrospectionService;
}): AgentToolWithMeta<any, unknown> {
const Params = Type.Intersect([SourceParam, Type.Object({
table: Type.String(),
columns: Type.Optional(Type.Array(Type.String())),
limit: Type.Optional(Type.Number()),
})]);
return {
name: 'mysql_table_sample',
label: 'MySQL 表样例',
description: '读取授权表的少量样例值,用于判断字段含义和 JOIN key。',
parameters: Params,
async execute(_id, params) {
const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
return jsonResult(await deps.mysqlIntrospection.sampleTable(source, {
table: params.table,
columns: params.columns,
limit: params.limit,
}));
},
};
}
export function createMysqlQueryTool(deps: {
dataSourceService: NetaClawDataSourceService;
mysqlQuery: MysqlQueryService;
}): AgentToolWithMeta<any, unknown> {
const Params = Type.Intersect([SourceParam, Type.Object({
sql: Type.String(),
})]);
return {
name: 'mysql_query',
label: 'MySQL 查询',
description: '执行授权数据源上的只读 SELECT SQL。支持授权表范围内的跨表 JOIN。',
parameters: Params,
async execute(id, params) {
const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
return jsonResult(await deps.mysqlQuery.executeReadOnly({
source,
sql: params.sql,
agentId: readAgentId(params),
userId: readUserId(params),
toolCallId: id,
}));
},
};
}
for (const name of ['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']) {
registerSchema({
name,
toolset: TOOLSET_MYSQL,
description: name,
visibility: 'tool',
capability: 'text',
isCore: false,
canDisable: true,
});
}
```
- [ ] **Step 4: Wire catalog, manifest, resolver**
Modify `packages/backend/src/modules/netaclaw/tools/catalog.ts`:
```ts
export const TOOLSET_MYSQL = 'mysql' as const;
import './builtin/mysql.js';
```
Modify `packages/backend/src/modules/netaclaw/tools/manifest.ts`:
```ts
const MAIN_PROCESS_PROXY_TOOLS = new Set([
'write_file',
'edit',
'patch',
'read_skill',
'read_skill_file',
'skill_manage',
'memory_save',
'memory_recall',
'mysql_list_sources',
'mysql_schema',
'mysql_table_sample',
'mysql_query',
]);
export function toToolKind(name: string): SubagentToolManifestItem['kind'] {
if (name.startsWith('mysql_')) return 'custom';
if (name.startsWith('memory_')) return 'memory';
if (name.startsWith('read_skill') || name === 'skill_manage') return 'skill';
if (name === 'delegate_task' || name === 'delegate_parallel') return 'delegation';
if (name === 'escalate') return 'admin';
if (
name === 'bash'
|| name === 'read_file'
|| name === 'write_file'
|| name === 'list_dir'
|| name === 'find_files'
|| name === 'grep'
|| name === 'edit'
|| name === 'patch'
|| name === 'clarify'
) {
return 'builtin';
}
return 'custom';
}
```
Modify `packages/backend/src/modules/netaclaw/service/tool_resolver.ts`:
- Import services and factories:
```ts
import { NetaClawDataSourceService } from './data_source.js';
import { MysqlIntrospectionService } from './mysql_schema.js';
import { MysqlQueryService } from './mysql_query.js';
import {
createMysqlListSourcesTool,
createMysqlQueryTool,
createMysqlSchemaTool,
createMysqlTableSampleTool,
} from '../tools/builtin/mysql.js';
```
- Add injected services:
```ts
@Inject()
dataSourceService: NetaClawDataSourceService;
@Inject()
mysqlIntrospection: MysqlIntrospectionService;
@Inject()
mysqlQuery: MysqlQueryService;
```
- Add to `getBaseToolMap()`:
```ts
['mysql_list_sources', createMysqlListSourcesTool({ dataSourceService: this.dataSourceService })],
['mysql_schema', createMysqlSchemaTool({ dataSourceService: this.dataSourceService, mysqlIntrospection: this.mysqlIntrospection })],
['mysql_table_sample', createMysqlTableSampleTool({ dataSourceService: this.dataSourceService, mysqlIntrospection: this.mysqlIntrospection })],
['mysql_query', createMysqlQueryTool({ dataSourceService: this.dataSourceService, mysqlQuery: this.mysqlQuery })],
```
- [ ] **Step 5: Run resolver tests**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts
```
Expected: PASS.
- [ ] **Step 6: Commit**
```bash
git add packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts packages/backend/src/modules/netaclaw/tools/catalog.ts packages/backend/src/modules/netaclaw/tools/manifest.ts packages/backend/src/modules/netaclaw/service/tool_resolver.ts packages/backend/test/tool_resolver.test.ts
git commit -m "feat: register mysql agent tools"
```
---
## Task 6: Admin Controller And Prompt Skill
**Files:**
- Create: `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts`
- Create: `packages/backend/skills/data-analyst-mysql/SKILL.md`
- [ ] **Step 1: Implement admin controller**
Create `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts`:
```ts
import { Body, Controller, Get, Inject, Post, Provide, Query } from '@midwayjs/core';
import { NetaClawDataSourceService } from '../../service/data_source.js';
@Provide()
@Controller('/admin/netaclaw/data-source')
export class NetaClawDataSourceAdminController {
@Inject()
dataSourceService: NetaClawDataSourceService;
@Get('/list')
async list(@Query('agentId') agentId?: number) {
if (agentId) {
return { code: 1000, data: await this.dataSourceService.listForAgent(Number(agentId)) };
}
return { code: 1000, data: await this.dataSourceService.listAdminSafe() };
}
@Post('/save')
async save(@Body() body: any) {
return { code: 1000, data: await this.dataSourceService.saveConfig(body) };
}
@Post('/delete')
async delete(@Body() body: { id: number }) {
await this.dataSourceService.delete(Number(body.id));
return { code: 1000, message: 'success' };
}
@Post('/test')
async test(@Body() body: any) {
const result = await this.dataSourceService.testConnection(body);
return result.ok
? { code: 1000, data: result }
: { code: 1001, message: result.error || 'connection_failed', data: result };
}
}
```
- [ ] **Step 2: Add prompt skill**
Create `packages/backend/skills/data-analyst-mysql/SKILL.md`:
```md
---
name: data-analyst-mysql
description: 使用 MySQL 工具进行只读智能问数,支持 schema 分析、跨表 JOIN、口径澄清和 SQL 结果解释。
version: 1.0.0
metadata:
skillType: llm
tags: [mysql, data-analysis, question-answering]
conditions:
requires_tools: ["mysql_list_sources", "mysql_schema", "mysql_query"]
---
# MySQL 智能问数
你负责使用 MySQL 工具回答业务数据问题。所有数据库访问必须通过 `mysql_*` 工具完成。
## 工作流程
1. 先调用 `mysql_list_sources` 查看当前 Agent 可用的数据源。
2. 根据用户问题选择数据源;如果无法判断,先澄清。
3. 调用 `mysql_schema` 读取相关表结构、字段、主键、索引和外键。
4. 跨表 JOIN 时优先使用外键;没有外键时,根据字段名、类型、索引和 `mysql_table_sample` 的样例值推断关联。
5. 业务口径不明确时必须先澄清,不要硬猜。
6. 只调用 `mysql_query` 执行只读 SELECT。
## JOIN 规则
- JOIN 必须有明确的 `ON``USING` 条件。
- 不确定关联字段时,先解释拟采用的关联并询问用户。
- 若关联来自推断,最终回答必须说明“该关联为推断”。
- 查询超时或被拒绝时,缩小时间范围、减少表数量或向用户澄清。
## 回答格式
回答必须包含:
- 结论。
- SQL。
- 口径说明特别是时间范围、筛选条件、JOIN 字段。
- 限制说明,例如 LIMIT、样例、脱敏列、推断关联或字段缺失。
不要暴露数据库 host、账号、密码、连接串或未授权表信息。
不要承诺“全量统计”,除非 SQL 口径和过滤范围明确。
```
- [ ] **Step 3: Run build or focused tests**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts test/skill_env_schema.test.ts
```
Expected: PASS.
- [ ] **Step 4: Commit**
```bash
git add packages/backend/src/modules/netaclaw/controller/admin/data_source.ts packages/backend/skills/data-analyst-mysql/SKILL.md
git commit -m "feat: add mysql data source admin api and analyst skill"
```
---
## Task 7: Verification And Documentation Sync
**Files:**
- Modify: `docs/code-wiki/entities/tool-system.md`
- Modify: `docs/code-wiki/entities/skill-system.md`
- Modify: `docs/code-wiki/entities/netaclaw-module.md`
- Modify: `docs/code-wiki/comparisons/database-entity-overview.md`
- [ ] **Step 1: Update code wiki**
Add concise references:
- `tool-system.md`: add `mysql_list_sources`, `mysql_schema`, `mysql_table_sample`, `mysql_query` under a `MySQL 问数` toolset.
- `skill-system.md`: add `data-analyst-mysql` as a prompt skill.
- `netaclaw-module.md`: increment table count and list `netaclaw_data_source`, `netaclaw_data_source_query_audit`.
- `database-entity-overview.md`: add field summary for both new tables.
- [ ] **Step 2: Run all focused backend tests**
Run:
```bash
pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/mysql_sql_guard.test.ts test/mysql_schema_mapper.test.ts test/mysql_query_service.test.ts test/mysql_pool_manager.test.ts test/tool_resolver.test.ts test/entity_exports.test.ts
```
Expected: PASS.
- [ ] **Step 3: Run backend build**
Run:
```bash
pnpm --filter @neta/backend build
```
Expected: exit code 0.
- [ ] **Step 4: Inspect final diff**
Run:
```bash
git status --short
git diff --stat
```
Expected: only MySQL question answering implementation files and docs are changed.
- [ ] **Step 5: Commit**
```bash
git add docs/code-wiki packages/backend
git commit -m "docs: update code wiki for mysql question answering"
```
---
## Final Verification Checklist
- [ ] SQL guard rejects `SHOW`, `DESCRIBE`, `EXPLAIN`, comments, semicolons, DML, DDL, `UNION`, CTE, user variables, dangerous functions, blocked tables, unallowed tables, missing JOIN condition, too many JOIN tables.
- [ ] Data source password is AES-GCM encrypted; admin APIs never return password ciphertext/plaintext, and `mysql_list_sources` never returns host, username, password, connection string, or permission details.
- [ ] `mysql_schema` returns only authorized table metadata and marks masked columns.
- [ ] `mysql_table_sample` validates table and column identifiers against schema plus allowlist/blocklist, and rejects masked columns.
- [ ] `mysql_query` rejects masked column references, explicit LIMIT above `maxRows`, missing per-JOIN `ON/USING`, blocked tables, and unallowed tables before querying MySQL.
- [ ] `mysql_query` writes audit for success, rejected, and failed SQL.
- [ ] MySQL tools are visible only when selected by `mysql` toolset or explicit tool config.
- [ ] Subagent worker route for MySQL tools is `main-process-proxy`, not `worker-local`.
- [ ] `data-analyst-mysql` skill is discoverable and condition-gated by MySQL tools.
- [ ] Focused tests pass.
- [ ] Backend build passes.