# MySQL Question Answering Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Build a MySQL-only, read-only intelligent data questioning backend path for Neta Agents, with safe data-source configuration, schema/sample/query tools, cross-table JOIN support, audit, and a prompt skill. **Architecture:** Add a `mysql` toolset backed by `netaclaw_data_source`, `SecretCryptoService`, `MysqlPoolManager`, schema/query services, and guarded Tool implementations. Security-critical rules live in services and tools; `data-analyst-mysql` only guides Agent behavior. **Tech Stack:** Midway.js, TypeORM, mysql2/promise, TypeBox tool schemas, Jest, NetaClaw Tool Catalog/Resolver/Runtime Policy, prompt Skill. --- ## File Structure Create: - `packages/backend/src/modules/netaclaw/entity/data_source.ts` — MySQL data-source configuration entity. - `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts` — query audit entity. - `packages/backend/src/modules/netaclaw/service/secret_crypto.ts` — shared AES-256-GCM helper for new encrypted secrets. - `packages/backend/src/modules/netaclaw/service/data_source.ts` — admin/runtime data-source lookup, authorization, save, and test connection service. - `packages/backend/src/modules/netaclaw/service/mysql_pool.ts` — cached mysql2 pool manager. - `packages/backend/src/modules/netaclaw/service/mysql_schema.ts` — information_schema mapping and schema/sample helpers. - `packages/backend/src/modules/netaclaw/service/mysql_query.ts` — read-only SQL execution, masked-column rejection, row limits, audit. - `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts` — `mysql_list_sources`, `mysql_schema`, `mysql_table_sample`, `mysql_query`. - `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts` — admin APIs for data-source list/save/test/delete. - `packages/backend/skills/data-analyst-mysql/SKILL.md` — prompt skill for MySQL data analysis. - `packages/backend/test/secret_crypto.test.ts` - `packages/backend/test/mysql_sql_guard.test.ts` - `packages/backend/test/mysql_schema_mapper.test.ts` - `packages/backend/test/mysql_query_service.test.ts` - `packages/backend/test/mysql_pool_manager.test.ts` - `packages/backend/test/data_source_projection.test.ts` Modify: - `packages/backend/src/entities.ts` — add data source and query audit entity imports to the generated entity list. This file is marked generated, but existing tests assert it directly, so update it for this change. - `packages/backend/src/modules/netaclaw/tools/catalog.ts` — add `TOOLSET_MYSQL` and import `./builtin/mysql.js`. - `packages/backend/src/modules/netaclaw/tools/manifest.ts` — route `mysql_*` tools as main-process proxy, classify as custom/text/parallel. - `packages/backend/src/modules/netaclaw/service/tool_resolver.ts` — inject MySQL tool dependencies into runtime tool map. - `packages/backend/test/entity_exports.test.ts` — assert new entities are exported. - `packages/backend/test/tool_resolver.test.ts` — assert mysql toolset resolution and subagent route. Do not implement frontend UI in this pass. The admin controller is enough for Phase 1/2 backend configuration. --- ## Task 1: Entities And Secret Crypto **Files:** - Create: `packages/backend/src/modules/netaclaw/entity/data_source.ts` - Create: `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts` - Create: `packages/backend/src/modules/netaclaw/service/secret_crypto.ts` - Modify: `packages/backend/src/entities.ts` - Modify: `packages/backend/test/entity_exports.test.ts` - Test: `packages/backend/test/secret_crypto.test.ts` - Test: `packages/backend/test/entity_exports.test.ts` - Test: `packages/backend/test/data_source_projection.test.ts` - [ ] **Step 1: Write the failing secret crypto test** Add `packages/backend/test/secret_crypto.test.ts`: ```ts import { SecretCryptoService } from '../src/modules/netaclaw/service/secret_crypto.js'; describe('SecretCryptoService', () => { const originalNetaSecret = process.env.NETA_SECRET_KEY; const originalAppSecret = process.env.APP_SECRET; beforeEach(() => { process.env.NETA_SECRET_KEY = 'unit-test-secret-key'; delete process.env.APP_SECRET; }); afterEach(() => { if (originalNetaSecret === undefined) delete process.env.NETA_SECRET_KEY; else process.env.NETA_SECRET_KEY = originalNetaSecret; if (originalAppSecret === undefined) delete process.env.APP_SECRET; else process.env.APP_SECRET = originalAppSecret; }); it('encrypts text without embedding the plaintext', () => { const service = new SecretCryptoService(); const encrypted = service.encryptText('mysql-password-123'); const envelope = JSON.parse(Buffer.from(encrypted, 'base64').toString('utf8')); expect(encrypted).not.toContain('mysql-password-123'); expect(envelope).toEqual(expect.objectContaining({ v: 1, alg: 'aes-256-gcm' })); expect(envelope.iv).toEqual(expect.any(String)); expect(envelope.tag).toEqual(expect.any(String)); expect(envelope.ct).toEqual(expect.any(String)); expect(service.decryptText(encrypted)).toBe('mysql-password-123'); }); it('encrypts and decrypts JSON values', () => { const service = new SecretCryptoService(); const encrypted = service.encryptJson({ password: 'secret', token: 'abc' }); expect(encrypted).not.toContain('secret'); expect(service.decryptJson(encrypted)).toEqual({ password: 'secret', token: 'abc' }); }); it('rejects tampered ciphertext', () => { const service = new SecretCryptoService(); const encrypted = service.encryptText('secret'); const tampered = encrypted.slice(0, -2) + 'xx'; expect(() => service.decryptText(tampered)).toThrow(); }); }); ``` - [ ] **Step 2: Run test to verify RED** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts ``` Expected: FAIL because `secret_crypto.ts` does not exist. - [ ] **Step 3: Implement `SecretCryptoService`** Create `packages/backend/src/modules/netaclaw/service/secret_crypto.ts`: ```ts import { Provide, Scope, ScopeEnum } from '@midwayjs/core'; import * as crypto from 'crypto'; @Provide() @Scope(ScopeEnum.Singleton) export class SecretCryptoService { private readonly algorithm = 'aes-256-gcm' as const; private readonly ivLength = 12; private deriveKey(): Buffer { const raw = process.env.NETA_SECRET_KEY || process.env.APP_SECRET; if (!raw) { throw new Error('NETA_SECRET_KEY or APP_SECRET is required for secret encryption'); } return crypto.createHash('sha256').update(raw).digest(); } encryptText(value: string): string { const iv = crypto.randomBytes(this.ivLength); const cipher = crypto.createCipheriv(this.algorithm, this.deriveKey(), iv); const encrypted = Buffer.concat([cipher.update(value, 'utf8'), cipher.final()]); const tag = cipher.getAuthTag(); return Buffer.from(JSON.stringify({ v: 1, alg: this.algorithm, iv: iv.toString('base64'), tag: tag.toString('base64'), ct: encrypted.toString('base64'), }), 'utf8').toString('base64'); } decryptText(ciphertext: string): string { const envelope = JSON.parse(Buffer.from(ciphertext, 'base64').toString('utf8')); if (!envelope || envelope.v !== 1 || envelope.alg !== this.algorithm) { throw new Error('Unsupported ciphertext envelope'); } const iv = Buffer.from(envelope.iv, 'base64'); const tag = Buffer.from(envelope.tag, 'base64'); const encrypted = Buffer.from(envelope.ct, 'base64'); const decipher = crypto.createDecipheriv(this.algorithm, this.deriveKey(), iv); decipher.setAuthTag(tag); return Buffer.concat([decipher.update(encrypted), decipher.final()]).toString('utf8'); } encryptJson(value: Record): string { return this.encryptText(JSON.stringify(value)); } decryptJson(ciphertext: string): Record { const parsed = JSON.parse(this.decryptText(ciphertext)); if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { throw new Error('Invalid encrypted JSON payload'); } return Object.fromEntries(Object.entries(parsed).map(([key, value]) => [key, String(value)])); } } ``` - [ ] **Step 4: Run test to verify GREEN** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts ``` Expected: PASS. - [ ] **Step 5: Write failing entity export test** Modify `packages/backend/test/entity_exports.test.ts`: ```ts import { entities } from '../src/entities.js'; import { NetaClawAgentSessionEntity } from '../src/modules/netaclaw/entity/agent_session.js'; import { NetaClawAgentSessionEntryEntity } from '../src/modules/netaclaw/entity/agent_session_entry.js'; import { NetaClawDataSourceEntity } from '../src/modules/netaclaw/entity/data_source.js'; import { NetaClawDataSourceQueryAuditEntity } from '../src/modules/netaclaw/entity/data_source_query_audit.js'; describe('entities exports', () => { it('includes SubagentSession entity', () => { const entityNames = entities .map(e => (typeof e === 'function' ? e.name : '')) .filter(Boolean); expect(entityNames.some(n => n.includes('SubagentSession'))).toBe(true); }); it('includes session-tree MySQL entities', () => { expect(entities).toContain(NetaClawAgentSessionEntity); expect(entities).toContain(NetaClawAgentSessionEntryEntity); }); it('includes MySQL question answering entities', () => { expect(entities).toContain(NetaClawDataSourceEntity); expect(entities).toContain(NetaClawDataSourceQueryAuditEntity); }); }); ``` - [ ] **Step 6: Run test to verify RED** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/entity_exports.test.ts ``` Expected: FAIL because entity files do not exist. - [ ] **Step 7: Implement data source entities** Create `packages/backend/src/modules/netaclaw/entity/data_source.ts`: ```ts import { BaseEntity } from '../../base/entity/base.js'; import { Column, Entity, Index } from 'typeorm'; export interface NetaClawDataSourceExtra { ssl?: boolean | Record; connectTimeout?: number; queryTimeoutMs?: number; maxRows?: number; allowedTables?: string[]; blockedTables?: string[]; maskedColumns?: Record; schemaVisibility?: 'allowed-only' | 'all-names-only'; maxJoinTables?: number; poolConnectionLimit?: number; } @Entity('netaclaw_data_source') export class NetaClawDataSourceEntity extends BaseEntity { @Index({ unique: true }) @Column({ comment: '数据源唯一名', length: 100 }) name: string; @Column({ comment: '显示名称', length: 200, nullable: true }) label: string; @Index() @Column({ comment: '数据源类型', length: 20, default: 'mysql' }) type: string; @Column({ comment: 'MySQL host', length: 255 }) host: string; @Column({ comment: 'MySQL port', default: 3306 }) port: number; @Column({ comment: '默认数据库', length: 128 }) database: string; @Column({ comment: '连接用户名', length: 255 }) username: string; @Column({ comment: '加密后的连接密码', type: 'text', nullable: true }) passwordEncrypted: string; @Column({ comment: '是否只读', default: 1 }) readonly: number; @Index() @Column({ comment: '状态 0=禁用 1=启用', default: 1 }) status: number; @Column({ comment: '授权 Agent ID 列表', type: 'json', nullable: true }) allowedAgentIds: number[]; @Column({ comment: '扩展配置', type: 'json', nullable: true }) extra: NetaClawDataSourceExtra; } ``` Create `packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts`: ```ts import { BaseEntity } from '../../base/entity/base.js'; import { Column, Entity, Index } from 'typeorm'; @Entity('netaclaw_data_source_query_audit') export class NetaClawDataSourceQueryAuditEntity extends BaseEntity { @Index() @Column({ comment: '数据源 ID' }) dataSourceId: number; @Index() @Column({ comment: 'Agent ID', nullable: true }) agentId: number; @Index() @Column({ comment: '用户 ID', nullable: true }) userId: number; @Column({ comment: '工具调用 ID', length: 100, nullable: true }) toolCallId: string; @Column({ comment: 'SQL SHA-256', length: 64 }) sqlHash: string; @Column({ comment: 'SQL 摘要', type: 'text', nullable: true }) sqlPreview: string; @Index() @Column({ comment: '状态 success/rejected/failed', length: 20 }) status: string; @Column({ comment: '拒绝原因', length: 100, nullable: true }) rejectReason: string; @Column({ comment: '耗时 ms', nullable: true }) elapsedMs: number; @Column({ comment: '返回行数', nullable: true }) rowCount: number; @Column({ comment: '错误代码', length: 50, nullable: true }) errorCode: string; } ``` Modify `packages/backend/src/entities.ts` by adding imports after existing netaclaw imports: ```ts import * as entity51 from './modules/netaclaw/entity/data_source'; import * as entity52 from './modules/netaclaw/entity/data_source_query_audit'; ``` and append: ```ts ...Object.values(entity51), ...Object.values(entity52), ``` - [ ] **Step 8: Run entity and crypto tests to verify GREEN** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/entity_exports.test.ts ``` Expected: PASS. - [ ] **Step 9: Write failing data source projection test** Add `packages/backend/test/data_source_projection.test.ts`: ```ts import { NetaClawDataSourceService } from '../src/modules/netaclaw/service/data_source.js'; describe('NetaClawDataSourceService projections', () => { const source = { id: 1, name: 'sales_prod', label: '销售库', type: 'mysql', host: '127.0.0.1', port: 3306, database: 'sales', username: 'root', passwordEncrypted: 'ciphertext', readonly: 1, status: 1, allowedAgentIds: [9], extra: { allowedTables: ['orders'] }, } as any; it('admin projection hides passwords but keeps connection metadata', () => { const service = new NetaClawDataSourceService(); expect(service.toAdminSafe(source)).toEqual(expect.objectContaining({ name: 'sales_prod', host: '127.0.0.1', username: 'root', hasPassword: true, })); expect(service.toAdminSafe(source)).not.toHaveProperty('passwordEncrypted'); }); it('agent projection hides host username password and permission details', () => { const service = new NetaClawDataSourceService(); expect(service.toAgentSummary(source)).toEqual({ name: 'sales_prod', label: '销售库', database: 'sales', status: 1, }); }); }); ``` - [ ] **Step 10: Run projection test to verify RED** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/data_source_projection.test.ts ``` Expected: FAIL because `toAdminSafe` and `toAgentSummary` do not exist. - [ ] **Step 11: Implement safe projections** Modify the service code in this task: ```ts export type AdminSafeDataSource = Omit & { hasPassword: boolean; }; export type AgentDataSourceSummary = Pick; ``` Replace any existing unsafe projection helper with: ```ts toAdminSafe(entity: NetaClawDataSourceEntity): AdminSafeDataSource { const { passwordEncrypted, ...rest } = entity as any; return { ...rest, hasPassword: !!passwordEncrypted }; } toAgentSummary(entity: NetaClawDataSourceEntity): AgentDataSourceSummary { return { name: entity.name, label: entity.label, database: entity.database, status: entity.status, }; } ``` Update `listAdminSafe()` and `listForAgent()` to return the right projections: ```ts async listAdminSafe(): Promise { const rows = await this.repo.find({ order: { id: 'DESC' } as any }); return rows.map(row => this.toAdminSafe(row)); } async listForAgent(agentId: number): Promise { const rows = await this.repo.find({ where: { type: 'mysql', status: 1 } as any, order: { id: 'DESC' } as any }); return rows .filter(row => Array.isArray(row.allowedAgentIds) && row.allowedAgentIds.includes(agentId)) .map(row => this.toAgentSummary(row)); } ``` Update `saveConfig()` to return `toAdminSafe(saved)`. - [ ] **Step 12: Run tests to verify GREEN** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/entity_exports.test.ts test/data_source_projection.test.ts ``` Expected: PASS. - [ ] **Step 13: Commit** ```bash git add packages/backend/src/modules/netaclaw/entity/data_source.ts packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts packages/backend/src/modules/netaclaw/service/secret_crypto.ts packages/backend/src/entities.ts packages/backend/test/secret_crypto.test.ts packages/backend/test/entity_exports.test.ts packages/backend/test/data_source_projection.test.ts git commit -m "feat: add mysql data source entities and secret crypto" ``` --- ## Task 2: SQL Guard **Files:** - Create: `packages/backend/src/modules/netaclaw/service/mysql_query.ts` - Test: `packages/backend/test/mysql_sql_guard.test.ts` - [ ] **Step 1: Write failing SQL guard tests** Add `packages/backend/test/mysql_sql_guard.test.ts`: ```ts import { validateMysqlReadOnlySql } from '../src/modules/netaclaw/service/mysql_query.js'; describe('validateMysqlReadOnlySql', () => { const baseOptions = { allowedTables: ['orders', 'customers'], blockedTables: [], maxJoinTables: 4, maxRows: 200, maskedColumns: {}, }; it('accepts a SELECT with an explicit JOIN condition', () => { expect(validateMysqlReadOnlySql( 'SELECT c.name, COUNT(*) FROM customers c JOIN orders o ON o.customer_id = c.id GROUP BY c.name LIMIT 20', baseOptions )).toEqual(expect.objectContaining({ ok: true, tables: expect.arrayContaining(['customers', 'orders']), })); }); it.each([ 'SHOW TABLES', 'DESCRIBE orders', 'EXPLAIN SELECT * FROM orders', 'UPDATE orders SET amount = 1', 'DELETE FROM orders', 'DROP TABLE orders', 'SELECT * FROM orders; SELECT * FROM customers', 'SELECT * FROM orders -- comment', 'SELECT * FROM orders # comment', 'SELECT * FROM orders /* comment */', 'WITH recent AS (SELECT * FROM orders) SELECT * FROM recent', 'SELECT * FROM orders UNION SELECT * FROM customers', 'SELECT @x := id FROM orders', 'SELECT SLEEP(1)', 'SELECT * INTO OUTFILE "/tmp/a" FROM orders', ])('rejects unsafe SQL: %s', sql => { const result = validateMysqlReadOnlySql(sql, baseOptions); expect(result.ok).toBe(false); }); it('rejects unallowed tables', () => { const result = validateMysqlReadOnlySql('SELECT * FROM payments LIMIT 10', baseOptions); expect(result).toEqual(expect.objectContaining({ ok: false, reason: 'table_not_allowed', })); }); it('rejects blocked tables even when allowed', () => { const result = validateMysqlReadOnlySql('SELECT * FROM orders LIMIT 10', { allowedTables: ['orders'], blockedTables: ['orders'], maxJoinTables: 4, }); expect(result).toEqual(expect.objectContaining({ ok: false, reason: 'table_blocked', })); }); it('rejects explicit joins without ON or USING', () => { const result = validateMysqlReadOnlySql( 'SELECT * FROM customers JOIN orders LIMIT 10', baseOptions ); expect(result).toEqual(expect.objectContaining({ ok: false, reason: 'join_condition_required', })); }); it('rejects more joined tables than allowed', () => { const result = validateMysqlReadOnlySql( 'SELECT * FROM a JOIN b ON b.a_id = a.id JOIN c ON c.b_id = b.id LIMIT 10', { allowedTables: ['a', 'b', 'c'], blockedTables: [], maxJoinTables: 2, maxRows: 200, maskedColumns: {} } ); expect(result).toEqual(expect.objectContaining({ ok: false, reason: 'too_many_join_tables', })); }); it('rejects a later JOIN that lacks its own ON or USING clause', () => { const result = validateMysqlReadOnlySql( 'SELECT * FROM customers c JOIN orders o ON o.customer_id = c.id JOIN invoices i LIMIT 10', { allowedTables: ['customers', 'orders', 'invoices'], blockedTables: [], maxJoinTables: 4, maxRows: 200, maskedColumns: {} } ); expect(result).toEqual(expect.objectContaining({ ok: false, reason: 'join_condition_required', })); }); it('rejects explicit LIMIT values above maxRows', () => { const result = validateMysqlReadOnlySql('SELECT * FROM orders LIMIT 10000', baseOptions); expect(result).toEqual(expect.objectContaining({ ok: false, reason: 'limit_exceeds_max_rows', })); }); it('rejects SQL that references masked columns', () => { const result = validateMysqlReadOnlySql('SELECT email AS e FROM customers LIMIT 10', { allowedTables: ['customers'], blockedTables: [], maxJoinTables: 4, maxRows: 200, maskedColumns: { 'customers.email': 'redact' }, }); expect(result).toEqual(expect.objectContaining({ ok: false, reason: 'masked_column_denied', })); }); }); ``` - [ ] **Step 2: Run test to verify RED** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_sql_guard.test.ts ``` Expected: FAIL because `mysql_query.ts` does not export the guard. - [ ] **Step 3: Implement SQL guard** Create the initial `packages/backend/src/modules/netaclaw/service/mysql_query.ts` with these exports: ```ts export type MysqlSqlGuardOptions = { allowedTables?: string[]; blockedTables?: string[]; maxJoinTables?: number; maxRows?: number; maskedColumns?: Record; }; export type MysqlSqlGuardResult = | { ok: true; tables: string[]; limitedSql: string } | { ok: false; reason: string; tables?: string[] }; const DENIED_PATTERNS: Array<[RegExp, string]> = [ [/;/, 'multiple_statements_denied'], [/(--|#|\/\*|\*\/)/, 'comments_denied'], [/^\s*(show|describe|desc|explain)\b/i, 'metadata_sql_denied'], [/^\s*(insert|update|delete|replace|create|alter|drop|truncate|call|grant|revoke|start|begin|commit|rollback)\b/i, 'non_select_denied'], [/\bunion\b/i, 'union_denied'], [/^\s*with\b/i, 'cte_denied'], [/@[a-zA-Z0-9_]+/, 'user_variable_denied'], [/\btemporary\b/i, 'temporary_table_denied'], [/\b(load_file|sleep|benchmark)\s*\(/i, 'dangerous_function_denied'], [/\binto\s+(out|dump)?file\b/i, 'file_output_denied'], [/\bfrom\s+[^,\s]+(?:\s+\w+)?\s*,\s*[^,\s]+/i, 'implicit_join_denied'], ]; function normalizeIdentifier(value: string): string { return value.replace(/`/g, '').split('.').pop()!.toLowerCase(); } function stripBackticks(value: string): string { return value.replace(/`/g, '').toLowerCase(); } export function extractMysqlTables(sql: string): string[] { const tables = new Set(); const tablePattern = /\b(?:from|join)\s+(`?[a-zA-Z0-9_]+`?(?:\.`?[a-zA-Z0-9_]+`?)?)/gi; let match: RegExpExecArray | null; while ((match = tablePattern.exec(sql))) { tables.add(normalizeIdentifier(match[1])); } return [...tables]; } export function validateMysqlReadOnlySql(sql: string, options: MysqlSqlGuardOptions = {}): MysqlSqlGuardResult { const trimmed = String(sql || '').trim(); if (!trimmed) return { ok: false, reason: 'sql_empty' }; if (!/^\s*select\b/i.test(trimmed)) return { ok: false, reason: 'only_select_allowed' }; for (const [pattern, reason] of DENIED_PATTERNS) { if (pattern.test(trimmed)) return { ok: false, reason }; } if (!everyJoinHasCondition(trimmed)) { return { ok: false, reason: 'join_condition_required' }; } const tables = extractMysqlTables(trimmed); const allowed = new Set((options.allowedTables || []).map(item => normalizeIdentifier(item))); const blocked = new Set((options.blockedTables || []).map(item => normalizeIdentifier(item))); for (const table of tables) { if (blocked.has(table)) return { ok: false, reason: 'table_blocked', tables }; if (allowed.size === 0 || !allowed.has(table)) return { ok: false, reason: 'table_not_allowed', tables }; } const maxJoinTables = Math.max(1, Math.min(options.maxJoinTables ?? 4, 6)); if (tables.length > maxJoinTables) { return { ok: false, reason: 'too_many_join_tables', tables }; } const maxRows = Math.max(1, Math.min(options.maxRows ?? 200, 1000)); const explicitLimit = /\blimit\s+(\d+)/i.exec(trimmed); if (explicitLimit && Number(explicitLimit[1]) > maxRows) { return { ok: false, reason: 'limit_exceeds_max_rows', tables }; } const normalizedSql = stripBackticks(trimmed); for (const key of Object.keys(options.maskedColumns || {})) { const [table, column] = key.toLowerCase().split('.'); if (tables.includes(table) && new RegExp(`\\b${column}\\b`, 'i').test(normalizedSql)) { return { ok: false, reason: 'masked_column_denied', tables }; } } const limitedSql = /\blimit\s+\d+/i.test(trimmed) ? trimmed : `SELECT * FROM (${trimmed}) AS neta_limited_query LIMIT ?`; return { ok: true, tables, limitedSql }; } function everyJoinHasCondition(sql: string): boolean { const joinPattern = /\bjoin\b/gi; let match: RegExpExecArray | null; while ((match = joinPattern.exec(sql))) { const rest = sql.slice(match.index + match[0].length); const boundary = /\b(join|where|group\s+by|order\s+by|limit)\b/i.exec(rest); const segment = boundary ? rest.slice(0, boundary.index) : rest; if (!/\b(on|using)\b/i.test(segment)) return false; } return true; } ``` - [ ] **Step 4: Run test to verify GREEN** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_sql_guard.test.ts ``` Expected: PASS. - [ ] **Step 5: Commit** ```bash git add packages/backend/src/modules/netaclaw/service/mysql_query.ts packages/backend/test/mysql_sql_guard.test.ts git commit -m "feat: add mysql read only sql guard" ``` --- ## Task 3: Data Source Service And Pool Manager **Files:** - Create: `packages/backend/src/modules/netaclaw/service/data_source.ts` - Create: `packages/backend/src/modules/netaclaw/service/mysql_pool.ts` - Modify: `packages/backend/src/modules/netaclaw/service/mysql_query.ts` - Test: `packages/backend/test/mysql_pool_manager.test.ts` - [ ] **Step 1: Write failing pool manager tests** Add `packages/backend/test/mysql_pool_manager.test.ts`: ```ts import { MysqlPoolManager } from '../src/modules/netaclaw/service/mysql_pool.js'; describe('MysqlPoolManager', () => { it('reuses pools by data source id and clamps connectionLimit', async () => { const created: any[] = []; const createPool = jest.fn((config: any) => { const pool = { config, end: jest.fn().mockResolvedValue(undefined) }; created.push(pool); return pool; }); const manager = new MysqlPoolManager(createPool as any); const source: any = { id: 7, host: 'localhost', port: 3306, database: 'sales', username: 'root', passwordEncrypted: 'encrypted', extra: { poolConnectionLimit: 20, connectTimeout: 1000 }, }; manager.secretCrypto = { decryptText: jest.fn().mockReturnValue('plain') } as any; const first = await manager.getPool(source); const second = await manager.getPool(source); expect(first).toBe(second); expect(createPool).toHaveBeenCalledTimes(1); expect(created[0].config.connectionLimit).toBe(10); expect(created[0].config.password).toBe('plain'); }); it('closes and forgets a cached pool', async () => { const end = jest.fn().mockResolvedValue(undefined); const createPool = jest.fn(() => ({ end })); const manager = new MysqlPoolManager(createPool as any); manager.secretCrypto = { decryptText: jest.fn().mockReturnValue('plain') } as any; await manager.getPool({ id: 1, host: 'h', port: 3306, database: 'd', username: 'u', passwordEncrypted: 'p', extra: {} } as any); await manager.closePool(1); expect(end).toHaveBeenCalledTimes(1); }); }); ``` - [ ] **Step 2: Run test to verify RED** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts ``` Expected: FAIL because `mysql_pool.ts` does not exist. - [ ] **Step 3: Implement pool manager** Create `packages/backend/src/modules/netaclaw/service/mysql_pool.ts`: ```ts import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core'; import mysql from 'mysql2/promise'; import type { Pool } from 'mysql2/promise'; import { NetaClawDataSourceEntity } from '../entity/data_source.js'; import { SecretCryptoService } from './secret_crypto.js'; type CreatePool = typeof mysql.createPool; @Provide() @Scope(ScopeEnum.Singleton) export class MysqlPoolManager { @Inject() secretCrypto: SecretCryptoService; private pools = new Map(); private readonly createPool: CreatePool; constructor(createPool: CreatePool = mysql.createPool) { this.createPool = createPool; } async getPool(source: NetaClawDataSourceEntity): Promise { const existing = this.pools.get(source.id); if (existing) return existing; const limit = Math.max(1, Math.min(Number(source.extra?.poolConnectionLimit ?? 3), 10)); const pool = this.createPool({ host: source.host, port: source.port || 3306, database: source.database, user: source.username, password: source.passwordEncrypted ? this.secretCrypto.decryptText(source.passwordEncrypted) : undefined, waitForConnections: true, connectionLimit: limit, connectTimeout: Math.max(1000, Number(source.extra?.connectTimeout ?? 10000)), ssl: source.extra?.ssl as any, }); this.pools.set(source.id, pool); return pool; } async closePool(sourceId: number): Promise { const pool = this.pools.get(sourceId); if (!pool) return; this.pools.delete(sourceId); await pool.end(); } createTransientPool(source: NetaClawDataSourceEntity): Pool { const limit = Math.max(1, Math.min(Number(source.extra?.poolConnectionLimit ?? 1), 3)); return this.createPool({ host: source.host, port: source.port || 3306, database: source.database, user: source.username, password: source.passwordEncrypted ? this.secretCrypto.decryptText(source.passwordEncrypted) : undefined, waitForConnections: true, connectionLimit: limit, connectTimeout: Math.max(1000, Number(source.extra?.connectTimeout ?? 10000)), ssl: source.extra?.ssl as any, }); } } ``` - [ ] **Step 4: Run pool tests to verify GREEN** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts ``` Expected: PASS. - [ ] **Step 5: Implement `DataSourceService`** Create `packages/backend/src/modules/netaclaw/service/data_source.ts`: ```ts import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository } from 'typeorm'; import { NetaClawDataSourceEntity, NetaClawDataSourceExtra } from '../entity/data_source.js'; import { SecretCryptoService } from './secret_crypto.js'; import { MysqlPoolManager } from './mysql_pool.js'; export type AdminSafeDataSource = Omit & { hasPassword: boolean; }; export type AgentDataSourceSummary = Pick; export type SaveDataSourceInput = Partial & { password?: string; }; @Provide() @Scope(ScopeEnum.Singleton) export class NetaClawDataSourceService { @InjectEntityModel(NetaClawDataSourceEntity) repo: Repository; @Inject() secretCrypto: SecretCryptoService; @Inject() mysqlPoolManager: MysqlPoolManager; toAdminSafe(entity: NetaClawDataSourceEntity): AdminSafeDataSource { const { passwordEncrypted, ...rest } = entity as any; return { ...rest, hasPassword: !!passwordEncrypted }; } toAgentSummary(entity: NetaClawDataSourceEntity): AgentDataSourceSummary { return { name: entity.name, label: entity.label, database: entity.database, status: entity.status, }; } async listAdminSafe(): Promise { const rows = await this.repo.find({ order: { id: 'DESC' } as any }); return rows.map(row => this.toAdminSafe(row)); } async listForAgent(agentId: number): Promise { const rows = await this.repo.find({ where: { type: 'mysql', status: 1 } as any, order: { id: 'DESC' } as any }); return rows .filter(row => Array.isArray(row.allowedAgentIds) && row.allowedAgentIds.includes(agentId)) .map(row => this.toAgentSummary(row)); } async getAuthorizedSource(name: string, agentId: number): Promise { const source = await this.repo.findOne({ where: { name, type: 'mysql', status: 1 } as any }); if (!source) throw new Error('data_source_not_found'); if (!Array.isArray(source.allowedAgentIds) || !source.allowedAgentIds.includes(agentId)) { throw new Error('data_source_not_authorized'); } return source; } normalizeExtra(extra?: NetaClawDataSourceExtra | null): NetaClawDataSourceExtra { return { allowedTables: extra?.allowedTables ?? [], blockedTables: extra?.blockedTables ?? [], maskedColumns: extra?.maskedColumns ?? {}, schemaVisibility: extra?.schemaVisibility ?? 'allowed-only', queryTimeoutMs: Math.max(1000, Math.min(Number(extra?.queryTimeoutMs ?? 10000), 30000)), maxRows: Math.max(1, Math.min(Number(extra?.maxRows ?? 200), 1000)), maxJoinTables: Math.max(1, Math.min(Number(extra?.maxJoinTables ?? 4), 6)), poolConnectionLimit: Math.max(1, Math.min(Number(extra?.poolConnectionLimit ?? 3), 10)), ssl: extra?.ssl, connectTimeout: extra?.connectTimeout, }; } async saveConfig(input: SaveDataSourceInput): Promise { const existing = input.id ? await this.repo.findOneBy({ id: input.id }) : null; const entity = this.repo.create({ ...(existing || {}), ...input, type: 'mysql', readonly: 1, status: input.status ?? existing?.status ?? 1, port: Number(input.port ?? existing?.port ?? 3306), allowedAgentIds: Array.isArray(input.allowedAgentIds) ? input.allowedAgentIds : existing?.allowedAgentIds ?? [], extra: this.normalizeExtra(input.extra), }); if (input.password) { entity.passwordEncrypted = this.secretCrypto.encryptText(input.password); } const saved = await this.repo.save(entity); await this.mysqlPoolManager.closePool(saved.id); return this.toAdminSafe(saved); } async delete(id: number): Promise { await this.repo.delete(id); await this.mysqlPoolManager.closePool(id); } async testConnection(input: SaveDataSourceInput): Promise<{ ok: boolean; error?: string }> { const existing = input.id ? await this.repo.findOneBy({ id: input.id }) : null; const source = this.repo.create({ ...(existing || {}), ...input, id: input.id ?? -1, type: 'mysql', readonly: 1, port: Number(input.port ?? 3306), allowedAgentIds: Array.isArray(input.allowedAgentIds) ? input.allowedAgentIds : [], extra: this.normalizeExtra(input.extra), passwordEncrypted: input.password ? this.secretCrypto.encryptText(input.password) : input.passwordEncrypted ?? existing?.passwordEncrypted, } as any); let pool: any; try { pool = this.mysqlPoolManager.createTransientPool(source); await pool.query('SELECT 1 AS ok'); return { ok: true }; } catch (err: any) { return { ok: false, error: sanitizeMysqlConnectionError(err) }; } finally { if (pool) await pool.end(); } } } function sanitizeMysqlConnectionError(err: any): string { return err?.code || err?.sqlState || err?.errno ? `mysql_connection_failed:${err.code || err.sqlState || err.errno}` : 'mysql_connection_failed'; } ``` - [ ] **Step 6: Run focused tests** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts test/secret_crypto.test.ts ``` Expected: PASS. - [ ] **Step 7: Commit** ```bash git add packages/backend/src/modules/netaclaw/service/data_source.ts packages/backend/src/modules/netaclaw/service/mysql_pool.ts packages/backend/test/mysql_pool_manager.test.ts git commit -m "feat: add mysql data source services" ``` --- ## Task 4: Schema Mapping And Query Execution **Files:** - Create: `packages/backend/src/modules/netaclaw/service/mysql_schema.ts` - Modify: `packages/backend/src/modules/netaclaw/service/mysql_query.ts` - Test: `packages/backend/test/mysql_schema_mapper.test.ts` - Test: `packages/backend/test/mysql_query_service.test.ts` - [ ] **Step 1: Write failing schema mapper test** Add `packages/backend/test/mysql_schema_mapper.test.ts`: ```ts import { mapMysqlSchemaRows } from '../src/modules/netaclaw/service/mysql_schema.js'; describe('mapMysqlSchemaRows', () => { it('maps columns, primary keys, indexes and foreign keys', () => { const result = mapMysqlSchemaRows({ columns: [ { tableName: 'orders', tableComment: '订单', columnName: 'id', columnType: 'int', isNullable: 'NO', columnKey: 'PRI', columnComment: 'ID' }, { tableName: 'orders', tableComment: '订单', columnName: 'customer_id', columnType: 'int', isNullable: 'NO', columnKey: 'MUL', columnComment: '客户' }, ], indexes: [ { tableName: 'orders', indexName: 'PRIMARY', columnName: 'id', nonUnique: 0, seqInIndex: 1 }, { tableName: 'orders', indexName: 'idx_customer', columnName: 'customer_id', nonUnique: 1, seqInIndex: 1 }, ], foreignKeys: [ { tableName: 'orders', columnName: 'customer_id', constraintName: 'fk_orders_customer', referencedTableName: 'customers', referencedColumnName: 'id' }, ], maskedColumns: { 'orders.customer_id': 'redact' }, }); expect(result.tables).toEqual([ expect.objectContaining({ name: 'orders', comment: '订单', primaryKey: ['id'], columns: [ expect.objectContaining({ name: 'id', type: 'int', nullable: false, key: 'PRI', masked: false }), expect.objectContaining({ name: 'customer_id', type: 'int', nullable: false, key: 'MUL', masked: true, maskMode: 'redact' }), ], indexes: expect.arrayContaining([ expect.objectContaining({ name: 'idx_customer', columns: ['customer_id'], unique: false }), ]), foreignKeys: [ expect.objectContaining({ column: 'customer_id', referencedTable: 'customers', referencedColumn: 'id' }), ], }), ]); }); }); ``` - [ ] **Step 2: Run schema mapper test to verify RED** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts ``` Expected: FAIL because `mysql_schema.ts` does not exist. - [ ] **Step 3: Implement schema mapper and service skeleton** Create `packages/backend/src/modules/netaclaw/service/mysql_schema.ts` with: ```ts import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core'; import { NetaClawDataSourceEntity } from '../entity/data_source.js'; import { MysqlPoolManager } from './mysql_pool.js'; type MaskMode = 'hash' | 'partial' | 'redact'; export interface MysqlSchemaRows { columns: Array<{ tableName: string; tableComment?: string; columnName: string; columnType: string; isNullable: string; columnKey?: string; columnComment?: string }>; indexes: Array<{ tableName: string; indexName: string; columnName: string; nonUnique: number; seqInIndex: number }>; foreignKeys: Array<{ tableName: string; columnName: string; constraintName: string; referencedTableName: string; referencedColumnName: string }>; maskedColumns?: Record; } export function mapMysqlSchemaRows(rows: MysqlSchemaRows) { const tableMap = new Map(); const getTable = (name: string, comment?: string) => { if (!tableMap.has(name)) { tableMap.set(name, { name, comment: comment || '', columns: [], primaryKey: [], indexes: [], foreignKeys: [] }); } return tableMap.get(name); }; for (const column of rows.columns) { const table = getTable(column.tableName, column.tableComment); const maskKey = `${column.tableName}.${column.columnName}`; const maskMode = rows.maskedColumns?.[maskKey]; if (column.columnKey === 'PRI') table.primaryKey.push(column.columnName); table.columns.push({ name: column.columnName, type: column.columnType, nullable: column.isNullable === 'YES', key: column.columnKey || '', comment: column.columnComment || '', masked: !!maskMode, maskMode: maskMode || null, }); } const indexGroups = new Map(); for (const index of rows.indexes) { const key = `${index.tableName}.${index.indexName}`; if (!indexGroups.has(key)) { indexGroups.set(key, { tableName: index.tableName, name: index.indexName, unique: index.nonUnique === 0, columns: [] as string[] }); } indexGroups.get(key).columns[index.seqInIndex - 1] = index.columnName; } for (const index of indexGroups.values()) { getTable(index.tableName).indexes.push({ name: index.name, unique: index.unique, columns: index.columns.filter(Boolean) }); } for (const fk of rows.foreignKeys) { getTable(fk.tableName).foreignKeys.push({ name: fk.constraintName, column: fk.columnName, referencedTable: fk.referencedTableName, referencedColumn: fk.referencedColumnName, }); } return { tables: [...tableMap.values()] }; } @Provide() @Scope(ScopeEnum.Singleton) export class MysqlIntrospectionService { @Inject() poolManager: MysqlPoolManager; async listSchema(source: NetaClawDataSourceEntity, options: { tables?: string[] } = {}) { const allowedTables = source.extra?.allowedTables || []; const requested = options.tables?.length ? options.tables : allowedTables; const visibleTables = requested.filter(table => allowedTables.includes(table) && !(source.extra?.blockedTables || []).includes(table)); if (visibleTables.length === 0) return { tables: [] }; const pool = await this.poolManager.getPool(source); const placeholders = visibleTables.map(() => '?').join(','); const [columns] = await pool.query( `SELECT TABLE_NAME AS tableName, '' AS tableComment, COLUMN_NAME AS columnName, COLUMN_TYPE AS columnType, IS_NULLABLE AS isNullable, COLUMN_KEY AS columnKey, COLUMN_COMMENT AS columnComment FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders}) ORDER BY TABLE_NAME, ORDINAL_POSITION`, [source.database, ...visibleTables] ); const [indexes] = await pool.query( `SELECT TABLE_NAME AS tableName, INDEX_NAME AS indexName, COLUMN_NAME AS columnName, NON_UNIQUE AS nonUnique, SEQ_IN_INDEX AS seqInIndex FROM information_schema.STATISTICS WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders}) ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX`, [source.database, ...visibleTables] ); const [foreignKeys] = await pool.query( `SELECT TABLE_NAME AS tableName, COLUMN_NAME AS columnName, CONSTRAINT_NAME AS constraintName, REFERENCED_TABLE_NAME AS referencedTableName, REFERENCED_COLUMN_NAME AS referencedColumnName FROM information_schema.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders}) AND REFERENCED_TABLE_NAME IS NOT NULL`, [source.database, ...visibleTables] ); return mapMysqlSchemaRows({ columns: columns as MysqlSchemaRows['columns'], indexes: indexes as MysqlSchemaRows['indexes'], foreignKeys: foreignKeys as MysqlSchemaRows['foreignKeys'], maskedColumns: source.extra?.maskedColumns, }); } async sampleTable(source: NetaClawDataSourceEntity, options: { table: string; columns?: string[]; limit?: number }) { const allowedTables = source.extra?.allowedTables || []; const blockedTables = source.extra?.blockedTables || []; if (!allowedTables.includes(options.table) || blockedTables.includes(options.table)) { throw new Error('table_not_allowed'); } const schema = await this.listSchema(source, { tables: [options.table] }); const table = schema.tables[0]; if (!table) throw new Error('table_not_found'); const visibleColumns = new Set(table.columns.map((column: any) => column.name)); const requestedColumns = options.columns?.length ? options.columns : table.columns.map((column: any) => column.name); for (const column of requestedColumns) { if (!/^[A-Za-z0-9_]+$/.test(column) || !visibleColumns.has(column)) { throw new Error('column_not_allowed'); } if (source.extra?.maskedColumns?.[`${options.table}.${column}`]) { throw new Error('masked_column_denied'); } } const safeTable = `\`${options.table.replace(/`/g, '')}\``; const safeColumns = requestedColumns.map(column => `\`${column.replace(/`/g, '')}\``).join(', '); const limit = Math.max(1, Math.min(Number(options.limit ?? 5), 20)); const pool = await this.poolManager.getPool(source); const [rows, fields] = await pool.query(`SELECT ${safeColumns} FROM ${safeTable} LIMIT ?`, [limit]); return { columns: Array.isArray(fields) ? fields.map((field: any) => field.name) : requestedColumns, rows: Array.isArray(rows) ? rows : [], rowCount: Array.isArray(rows) ? rows.length : 0, truncated: false, }; } } ``` - [ ] **Step 4: Run schema mapper test to verify GREEN** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts ``` Expected: PASS. - [ ] **Step 5: Write failing query service test** Add `packages/backend/test/mysql_query_service.test.ts`: ```ts import { MysqlQueryService } from '../src/modules/netaclaw/service/mysql_query.js'; describe('MysqlQueryService', () => { it('executes a guarded query and writes audit', async () => { const query = jest.fn().mockResolvedValue([ [ { id: 1, total: 99 }, ], [{ name: 'id' }, { name: 'total' }], ]); const service = new MysqlQueryService(); service.poolManager = { getPool: jest.fn().mockResolvedValue({ query }) } as any; service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any; const result = await service.executeReadOnly({ source: { id: 1, database: 'sales', extra: { allowedTables: ['orders'], blockedTables: [], maskedColumns: {}, maxRows: 200, maxJoinTables: 4, queryTimeoutMs: 10000, }, } as any, sql: 'SELECT id, total FROM orders LIMIT 10', agentId: 3, userId: 5, toolCallId: 'tool-1', }); expect(result.rows).toEqual([{ id: 1, total: 99 }]); expect(result.rowCount).toBe(1); expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({ dataSourceId: 1, agentId: 3, userId: 5, toolCallId: 'tool-1', status: 'success', rowCount: 1, })); }); it('rejects masked column references before querying mysql', async () => { const service = new MysqlQueryService(); service.poolManager = { getPool: jest.fn() } as any; service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any; await expect(service.executeReadOnly({ source: { id: 1, extra: { allowedTables: ['orders'], blockedTables: [], maskedColumns: { 'orders.email': 'redact' }, maxRows: 200, maxJoinTables: 4, }, } as any, sql: 'SELECT email AS e FROM orders LIMIT 10', agentId: 3, userId: 5, toolCallId: 'tool-masked', })).rejects.toThrow('mysql_sql_rejected'); expect(service.poolManager.getPool).not.toHaveBeenCalled(); expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({ status: 'rejected', rejectReason: 'masked_column_denied', })); }); it('audits rejected SQL', async () => { const service = new MysqlQueryService(); service.poolManager = { getPool: jest.fn() } as any; service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any; await expect(service.executeReadOnly({ source: { id: 1, extra: { allowedTables: ['orders'], blockedTables: [] } } as any, sql: 'DROP TABLE orders', agentId: 3, userId: 5, toolCallId: 'tool-2', })).rejects.toThrow('mysql_sql_rejected'); expect(service.poolManager.getPool).not.toHaveBeenCalled(); expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({ status: 'rejected', })); }); }); ``` - [ ] **Step 6: Run query service test to verify RED** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_query_service.test.ts ``` Expected: FAIL because `MysqlQueryService` is not implemented. - [ ] **Step 7: Implement query service** Extend `packages/backend/src/modules/netaclaw/service/mysql_query.ts` with: ```ts import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core'; import { InjectEntityModel } from '@midwayjs/typeorm'; import { Repository } from 'typeorm'; import * as crypto from 'crypto'; import { NetaClawDataSourceEntity } from '../entity/data_source.js'; import { NetaClawDataSourceQueryAuditEntity } from '../entity/data_source_query_audit.js'; import { MysqlPoolManager } from './mysql_pool.js'; export interface ExecuteReadOnlyParams { source: NetaClawDataSourceEntity; sql: string; agentId?: number; userId?: number; toolCallId?: string; } function hashSql(sql: string): string { return crypto.createHash('sha256').update(sql).digest('hex'); } function sanitizeMysqlError(err: any): string { const code = err?.code || err?.sqlState || err?.errno; return code ? `mysql_query_failed:${code}` : 'mysql_query_failed'; } @Provide() @Scope(ScopeEnum.Singleton) export class MysqlQueryService { @Inject() poolManager: MysqlPoolManager; @InjectEntityModel(NetaClawDataSourceQueryAuditEntity) auditRepo: Repository; async executeReadOnly(params: ExecuteReadOnlyParams) { const start = Date.now(); const extra = params.source.extra || {}; const guard = validateMysqlReadOnlySql(params.sql, { allowedTables: extra.allowedTables || [], blockedTables: extra.blockedTables || [], maxJoinTables: extra.maxJoinTables || 4, maxRows: extra.maxRows || 200, maskedColumns: extra.maskedColumns || {}, }); if (!guard.ok) { await this.writeAudit(params, 'rejected', start, 0, guard.reason); throw new Error(`mysql_sql_rejected: ${guard.reason}`); } try { const pool = await this.poolManager.getPool(params.source); const maxRows = Math.max(1, Math.min(Number(extra.maxRows ?? 200), 1000)); const appendedLimit = guard.limitedSql.includes('?'); const sql = guard.limitedSql.includes('?') ? guard.limitedSql : guard.limitedSql; const sqlParams = appendedLimit ? [maxRows + 1] : []; const [rows, fields] = await pool.query({ sql, timeout: Math.max(1000, Math.min(Number(extra.queryTimeoutMs ?? 10000), 30000)) } as any, sqlParams); const fetchedRows = Array.isArray(rows) ? rows as any[] : []; const truncated = appendedLimit && fetchedRows.length > maxRows; const resultRows = truncated ? fetchedRows.slice(0, Math.min(fetchedRows.length, maxRows)) : fetchedRows; await this.writeAudit(params, 'success', start, resultRows.length, null); return { columns: Array.isArray(fields) ? fields.map((field: any) => field.name) : Object.keys(resultRows[0] || {}), rows: resultRows, rowCount: resultRows.length, truncated, elapsedMs: Date.now() - start, sql: params.sql, }; } catch (err: any) { await this.writeAudit(params, 'failed', start, 0, null, err?.code || err?.errno || err?.sqlState); throw new Error(sanitizeMysqlError(err)); } } private async writeAudit( params: ExecuteReadOnlyParams, status: 'success' | 'rejected' | 'failed', start: number, rowCount: number, rejectReason?: string | null, errorCode?: string | number | null ) { await this.auditRepo.save({ dataSourceId: params.source.id, agentId: params.agentId ?? null, userId: params.userId ?? null, toolCallId: params.toolCallId ?? null, sqlHash: hashSql(params.sql), sqlPreview: params.sql.slice(0, 1000), status, rejectReason: rejectReason ?? null, elapsedMs: Date.now() - start, rowCount, errorCode: errorCode ? String(errorCode) : null, } as any); } } ``` - [ ] **Step 8: Run schema/query tests** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts test/mysql_query_service.test.ts test/mysql_sql_guard.test.ts ``` Expected: PASS. - [ ] **Step 9: Commit** ```bash git add packages/backend/src/modules/netaclaw/service/mysql_schema.ts packages/backend/src/modules/netaclaw/service/mysql_query.ts packages/backend/test/mysql_schema_mapper.test.ts packages/backend/test/mysql_query_service.test.ts git commit -m "feat: add mysql schema and query services" ``` --- ## Task 5: MySQL Agent Tools And Resolver Wiring **Files:** - Create: `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts` - Modify: `packages/backend/src/modules/netaclaw/tools/catalog.ts` - Modify: `packages/backend/src/modules/netaclaw/tools/manifest.ts` - Modify: `packages/backend/src/modules/netaclaw/service/tool_resolver.ts` - Test: `packages/backend/test/tool_resolver.test.ts` - [ ] **Step 1: Write failing resolver tests** Append to `packages/backend/test/tool_resolver.test.ts`: ```ts it('resolves mysql toolset tools when enabled for an agent', async () => { service.toolRegistry = { all: jest.fn().mockResolvedValue([ { name: 'mysql_list_sources', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' }, { name: 'mysql_schema', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' }, { name: 'mysql_table_sample', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' }, { name: 'mysql_query', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' }, ]), } as any; service.skillLoader = {} as any; service.dataSourceService = { listForAgent: jest.fn().mockResolvedValue([]) } as any; service.mysqlIntrospection = {} as any; service.mysqlQuery = {} as any; const result = await service.resolve({ agent: { id: 9, toolsets: ['mysql'], tools: { disabled: [] } } as any, modelCapability: 'text', }); expect(result.toolNames).toEqual(['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']); expect(result.tools.map(tool => tool.name)).toEqual(['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']); }); it('routes mysql tools through main process proxy for subagent workers', async () => { service.toolRegistry = { all: jest.fn().mockResolvedValue([ { name: 'mysql_query', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' }, ]), } as any; service.skillLoader = {} as any; service.dataSourceService = {} as any; service.mysqlIntrospection = {} as any; service.mysqlQuery = {} as any; const result = await service.resolve({ agent: { tools: { inheritCoreTools: false, enabled: ['mysql_query'], disabled: [] }, } as any, modelCapability: 'text', delegationRole: 'subagent', }); expect(result.toolManifest).toEqual([ expect.objectContaining({ name: 'mysql_query', supportedInWorker: false, workerRoutingHint: 'main-process-proxy', }), ]); }); ``` - [ ] **Step 2: Run resolver test to verify RED** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts ``` Expected: FAIL because mysql tools are not registered or injected. - [ ] **Step 3: Implement MySQL tools** Create `packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts`: ```ts import { Type } from '@sinclair/typebox'; import { AgentToolWithMeta, jsonResult } from '../common.js'; import { registerSchema } from '../catalog.js'; import { NetaClawDataSourceService } from '../../service/data_source.js'; import { MysqlIntrospectionService } from '../../service/mysql_schema.js'; import { MysqlQueryService } from '../../service/mysql_query.js'; export const TOOLSET_MYSQL = 'mysql' as const; const Runtime = Type.Optional(Type.Any()); const SourceParam = Type.Object({ source: Type.String({ description: '数据源名称' }), _netaRuntime: Runtime, }); function readAgentId(params: any): number { const id = params?._netaRuntime?.currentAgent?.id; if (!id) throw new Error('current_agent_id_missing'); return Number(id); } function readUserId(params: any): number | undefined { const id = params?._netaRuntime?.userId; return id === undefined || id === null ? undefined : Number(id); } export function createMysqlListSourcesTool(deps: { dataSourceService: NetaClawDataSourceService; }): AgentToolWithMeta, unknown> { const Params = Type.Object({ _netaRuntime: Runtime }); return { name: 'mysql_list_sources', label: 'MySQL 数据源列表', description: '列出当前 Agent 授权可用的 MySQL 数据源,不返回连接密码。', parameters: Params, async execute(_id, params) { return jsonResult({ sources: await deps.dataSourceService.listForAgent(readAgentId(params)) }); }, } as any; } export function createMysqlSchemaTool(deps: { dataSourceService: NetaClawDataSourceService; mysqlIntrospection: MysqlIntrospectionService; }): AgentToolWithMeta { const Params = Type.Intersect([SourceParam, Type.Object({ tables: Type.Optional(Type.Array(Type.String())), })]); return { name: 'mysql_schema', label: 'MySQL Schema', description: '读取授权 MySQL 表的字段、索引、主键和外键信息。', parameters: Params, async execute(_id, params) { const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params)); return jsonResult(await deps.mysqlIntrospection.listSchema(source, { tables: params.tables })); }, }; } export function createMysqlTableSampleTool(deps: { dataSourceService: NetaClawDataSourceService; mysqlIntrospection: MysqlIntrospectionService; }): AgentToolWithMeta { const Params = Type.Intersect([SourceParam, Type.Object({ table: Type.String(), columns: Type.Optional(Type.Array(Type.String())), limit: Type.Optional(Type.Number()), })]); return { name: 'mysql_table_sample', label: 'MySQL 表样例', description: '读取授权表的少量样例值,用于判断字段含义和 JOIN key。', parameters: Params, async execute(_id, params) { const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params)); return jsonResult(await deps.mysqlIntrospection.sampleTable(source, { table: params.table, columns: params.columns, limit: params.limit, })); }, }; } export function createMysqlQueryTool(deps: { dataSourceService: NetaClawDataSourceService; mysqlQuery: MysqlQueryService; }): AgentToolWithMeta { const Params = Type.Intersect([SourceParam, Type.Object({ sql: Type.String(), })]); return { name: 'mysql_query', label: 'MySQL 查询', description: '执行授权数据源上的只读 SELECT SQL。支持授权表范围内的跨表 JOIN。', parameters: Params, async execute(id, params) { const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params)); return jsonResult(await deps.mysqlQuery.executeReadOnly({ source, sql: params.sql, agentId: readAgentId(params), userId: readUserId(params), toolCallId: id, })); }, }; } for (const name of ['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']) { registerSchema({ name, toolset: TOOLSET_MYSQL, description: name, visibility: 'tool', capability: 'text', isCore: false, canDisable: true, }); } ``` - [ ] **Step 4: Wire catalog, manifest, resolver** Modify `packages/backend/src/modules/netaclaw/tools/catalog.ts`: ```ts export const TOOLSET_MYSQL = 'mysql' as const; import './builtin/mysql.js'; ``` Modify `packages/backend/src/modules/netaclaw/tools/manifest.ts`: ```ts const MAIN_PROCESS_PROXY_TOOLS = new Set([ 'write_file', 'edit', 'patch', 'read_skill', 'read_skill_file', 'skill_manage', 'memory_save', 'memory_recall', 'mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query', ]); export function toToolKind(name: string): SubagentToolManifestItem['kind'] { if (name.startsWith('mysql_')) return 'custom'; if (name.startsWith('memory_')) return 'memory'; if (name.startsWith('read_skill') || name === 'skill_manage') return 'skill'; if (name === 'delegate_task' || name === 'delegate_parallel') return 'delegation'; if (name === 'escalate') return 'admin'; if ( name === 'bash' || name === 'read_file' || name === 'write_file' || name === 'list_dir' || name === 'find_files' || name === 'grep' || name === 'edit' || name === 'patch' || name === 'clarify' ) { return 'builtin'; } return 'custom'; } ``` Modify `packages/backend/src/modules/netaclaw/service/tool_resolver.ts`: - Import services and factories: ```ts import { NetaClawDataSourceService } from './data_source.js'; import { MysqlIntrospectionService } from './mysql_schema.js'; import { MysqlQueryService } from './mysql_query.js'; import { createMysqlListSourcesTool, createMysqlQueryTool, createMysqlSchemaTool, createMysqlTableSampleTool, } from '../tools/builtin/mysql.js'; ``` - Add injected services: ```ts @Inject() dataSourceService: NetaClawDataSourceService; @Inject() mysqlIntrospection: MysqlIntrospectionService; @Inject() mysqlQuery: MysqlQueryService; ``` - Add to `getBaseToolMap()`: ```ts ['mysql_list_sources', createMysqlListSourcesTool({ dataSourceService: this.dataSourceService })], ['mysql_schema', createMysqlSchemaTool({ dataSourceService: this.dataSourceService, mysqlIntrospection: this.mysqlIntrospection })], ['mysql_table_sample', createMysqlTableSampleTool({ dataSourceService: this.dataSourceService, mysqlIntrospection: this.mysqlIntrospection })], ['mysql_query', createMysqlQueryTool({ dataSourceService: this.dataSourceService, mysqlQuery: this.mysqlQuery })], ``` - [ ] **Step 5: Run resolver tests** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts ``` Expected: PASS. - [ ] **Step 6: Commit** ```bash git add packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts packages/backend/src/modules/netaclaw/tools/catalog.ts packages/backend/src/modules/netaclaw/tools/manifest.ts packages/backend/src/modules/netaclaw/service/tool_resolver.ts packages/backend/test/tool_resolver.test.ts git commit -m "feat: register mysql agent tools" ``` --- ## Task 6: Admin Controller And Prompt Skill **Files:** - Create: `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts` - Create: `packages/backend/skills/data-analyst-mysql/SKILL.md` - [ ] **Step 1: Implement admin controller** Create `packages/backend/src/modules/netaclaw/controller/admin/data_source.ts`: ```ts import { Body, Controller, Get, Inject, Post, Provide, Query } from '@midwayjs/core'; import { NetaClawDataSourceService } from '../../service/data_source.js'; @Provide() @Controller('/admin/netaclaw/data-source') export class NetaClawDataSourceAdminController { @Inject() dataSourceService: NetaClawDataSourceService; @Get('/list') async list(@Query('agentId') agentId?: number) { if (agentId) { return { code: 1000, data: await this.dataSourceService.listForAgent(Number(agentId)) }; } return { code: 1000, data: await this.dataSourceService.listAdminSafe() }; } @Post('/save') async save(@Body() body: any) { return { code: 1000, data: await this.dataSourceService.saveConfig(body) }; } @Post('/delete') async delete(@Body() body: { id: number }) { await this.dataSourceService.delete(Number(body.id)); return { code: 1000, message: 'success' }; } @Post('/test') async test(@Body() body: any) { const result = await this.dataSourceService.testConnection(body); return result.ok ? { code: 1000, data: result } : { code: 1001, message: result.error || 'connection_failed', data: result }; } } ``` - [ ] **Step 2: Add prompt skill** Create `packages/backend/skills/data-analyst-mysql/SKILL.md`: ```md --- name: data-analyst-mysql description: 使用 MySQL 工具进行只读智能问数,支持 schema 分析、跨表 JOIN、口径澄清和 SQL 结果解释。 version: 1.0.0 metadata: skillType: llm tags: [mysql, data-analysis, question-answering] conditions: requires_tools: ["mysql_list_sources", "mysql_schema", "mysql_query"] --- # MySQL 智能问数 你负责使用 MySQL 工具回答业务数据问题。所有数据库访问必须通过 `mysql_*` 工具完成。 ## 工作流程 1. 先调用 `mysql_list_sources` 查看当前 Agent 可用的数据源。 2. 根据用户问题选择数据源;如果无法判断,先澄清。 3. 调用 `mysql_schema` 读取相关表结构、字段、主键、索引和外键。 4. 跨表 JOIN 时优先使用外键;没有外键时,根据字段名、类型、索引和 `mysql_table_sample` 的样例值推断关联。 5. 业务口径不明确时必须先澄清,不要硬猜。 6. 只调用 `mysql_query` 执行只读 SELECT。 ## JOIN 规则 - JOIN 必须有明确的 `ON` 或 `USING` 条件。 - 不确定关联字段时,先解释拟采用的关联并询问用户。 - 若关联来自推断,最终回答必须说明“该关联为推断”。 - 查询超时或被拒绝时,缩小时间范围、减少表数量或向用户澄清。 ## 回答格式 回答必须包含: - 结论。 - SQL。 - 口径说明,特别是时间范围、筛选条件、JOIN 字段。 - 限制说明,例如 LIMIT、样例、脱敏列、推断关联或字段缺失。 不要暴露数据库 host、账号、密码、连接串或未授权表信息。 不要承诺“全量统计”,除非 SQL 口径和过滤范围明确。 ``` - [ ] **Step 3: Run build or focused tests** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts test/skill_env_schema.test.ts ``` Expected: PASS. - [ ] **Step 4: Commit** ```bash git add packages/backend/src/modules/netaclaw/controller/admin/data_source.ts packages/backend/skills/data-analyst-mysql/SKILL.md git commit -m "feat: add mysql data source admin api and analyst skill" ``` --- ## Task 7: Verification And Documentation Sync **Files:** - Modify: `docs/code-wiki/entities/tool-system.md` - Modify: `docs/code-wiki/entities/skill-system.md` - Modify: `docs/code-wiki/entities/netaclaw-module.md` - Modify: `docs/code-wiki/comparisons/database-entity-overview.md` - [ ] **Step 1: Update code wiki** Add concise references: - `tool-system.md`: add `mysql_list_sources`, `mysql_schema`, `mysql_table_sample`, `mysql_query` under a `MySQL 问数` toolset. - `skill-system.md`: add `data-analyst-mysql` as a prompt skill. - `netaclaw-module.md`: increment table count and list `netaclaw_data_source`, `netaclaw_data_source_query_audit`. - `database-entity-overview.md`: add field summary for both new tables. - [ ] **Step 2: Run all focused backend tests** Run: ```bash pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/mysql_sql_guard.test.ts test/mysql_schema_mapper.test.ts test/mysql_query_service.test.ts test/mysql_pool_manager.test.ts test/tool_resolver.test.ts test/entity_exports.test.ts ``` Expected: PASS. - [ ] **Step 3: Run backend build** Run: ```bash pnpm --filter @neta/backend build ``` Expected: exit code 0. - [ ] **Step 4: Inspect final diff** Run: ```bash git status --short git diff --stat ``` Expected: only MySQL question answering implementation files and docs are changed. - [ ] **Step 5: Commit** ```bash git add docs/code-wiki packages/backend git commit -m "docs: update code wiki for mysql question answering" ``` --- ## Final Verification Checklist - [ ] SQL guard rejects `SHOW`, `DESCRIBE`, `EXPLAIN`, comments, semicolons, DML, DDL, `UNION`, CTE, user variables, dangerous functions, blocked tables, unallowed tables, missing JOIN condition, too many JOIN tables. - [ ] Data source password is AES-GCM encrypted; admin APIs never return password ciphertext/plaintext, and `mysql_list_sources` never returns host, username, password, connection string, or permission details. - [ ] `mysql_schema` returns only authorized table metadata and marks masked columns. - [ ] `mysql_table_sample` validates table and column identifiers against schema plus allowlist/blocklist, and rejects masked columns. - [ ] `mysql_query` rejects masked column references, explicit LIMIT above `maxRows`, missing per-JOIN `ON/USING`, blocked tables, and unallowed tables before querying MySQL. - [ ] `mysql_query` writes audit for success, rejected, and failed SQL. - [ ] MySQL tools are visible only when selected by `mysql` toolset or explicit tool config. - [ ] Subagent worker route for MySQL tools is `main-process-proxy`, not `worker-local`. - [ ] `data-analyst-mysql` skill is discoverable and condition-gated by MySQL tools. - [ ] Focused tests pass. - [ ] Backend build passes.