GPU_GUARD_MONOREPO/docs/superpowers/plans/2026-05-15-mysql-question-answering.md
2026-05-20 21:39:12 +08:00

69 KiB
Raw Permalink Blame History

MySQL Question Answering Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build a MySQL-only, read-only intelligent data questioning backend path for Neta Agents, with safe data-source configuration, schema/sample/query tools, cross-table JOIN support, audit, and a prompt skill.

Architecture: Add a mysql toolset backed by netaclaw_data_source, SecretCryptoService, MysqlPoolManager, schema/query services, and guarded Tool implementations. Security-critical rules live in services and tools; data-analyst-mysql only guides Agent behavior.

Tech Stack: Midway.js, TypeORM, mysql2/promise, TypeBox tool schemas, Jest, NetaClaw Tool Catalog/Resolver/Runtime Policy, prompt Skill.


File Structure

Create:

  • packages/backend/src/modules/netaclaw/entity/data_source.ts — MySQL data-source configuration entity.
  • packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts — query audit entity.
  • packages/backend/src/modules/netaclaw/service/secret_crypto.ts — shared AES-256-GCM helper for new encrypted secrets.
  • packages/backend/src/modules/netaclaw/service/data_source.ts — admin/runtime data-source lookup, authorization, save, and test connection service.
  • packages/backend/src/modules/netaclaw/service/mysql_pool.ts — cached mysql2 pool manager.
  • packages/backend/src/modules/netaclaw/service/mysql_schema.ts — information_schema mapping and schema/sample helpers.
  • packages/backend/src/modules/netaclaw/service/mysql_query.ts — read-only SQL execution, masked-column rejection, row limits, audit.
  • packages/backend/src/modules/netaclaw/tools/builtin/mysql.tsmysql_list_sources, mysql_schema, mysql_table_sample, mysql_query.
  • packages/backend/src/modules/netaclaw/controller/admin/data_source.ts — admin APIs for data-source list/save/test/delete.
  • packages/backend/skills/data-analyst-mysql/SKILL.md — prompt skill for MySQL data analysis.
  • packages/backend/test/secret_crypto.test.ts
  • packages/backend/test/mysql_sql_guard.test.ts
  • packages/backend/test/mysql_schema_mapper.test.ts
  • packages/backend/test/mysql_query_service.test.ts
  • packages/backend/test/mysql_pool_manager.test.ts
  • packages/backend/test/data_source_projection.test.ts

Modify:

  • packages/backend/src/entities.ts — add data source and query audit entity imports to the generated entity list. This file is marked generated, but existing tests assert it directly, so update it for this change.
  • packages/backend/src/modules/netaclaw/tools/catalog.ts — add TOOLSET_MYSQL and import ./builtin/mysql.js.
  • packages/backend/src/modules/netaclaw/tools/manifest.ts — route mysql_* tools as main-process proxy, classify as custom/text/parallel.
  • packages/backend/src/modules/netaclaw/service/tool_resolver.ts — inject MySQL tool dependencies into runtime tool map.
  • packages/backend/test/entity_exports.test.ts — assert new entities are exported.
  • packages/backend/test/tool_resolver.test.ts — assert mysql toolset resolution and subagent route.

Do not implement frontend UI in this pass. The admin controller is enough for Phase 1/2 backend configuration.


Task 1: Entities And Secret Crypto

Files:

  • Create: packages/backend/src/modules/netaclaw/entity/data_source.ts

  • Create: packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts

  • Create: packages/backend/src/modules/netaclaw/service/secret_crypto.ts

  • Modify: packages/backend/src/entities.ts

  • Modify: packages/backend/test/entity_exports.test.ts

  • Test: packages/backend/test/secret_crypto.test.ts

  • Test: packages/backend/test/entity_exports.test.ts

  • Test: packages/backend/test/data_source_projection.test.ts

  • Step 1: Write the failing secret crypto test

Add packages/backend/test/secret_crypto.test.ts:

import { SecretCryptoService } from '../src/modules/netaclaw/service/secret_crypto.js';

describe('SecretCryptoService', () => {
  const originalNetaSecret = process.env.NETA_SECRET_KEY;
  const originalAppSecret = process.env.APP_SECRET;

  beforeEach(() => {
    process.env.NETA_SECRET_KEY = 'unit-test-secret-key';
    delete process.env.APP_SECRET;
  });

  afterEach(() => {
    if (originalNetaSecret === undefined) delete process.env.NETA_SECRET_KEY;
    else process.env.NETA_SECRET_KEY = originalNetaSecret;
    if (originalAppSecret === undefined) delete process.env.APP_SECRET;
    else process.env.APP_SECRET = originalAppSecret;
  });

  it('encrypts text without embedding the plaintext', () => {
    const service = new SecretCryptoService();

    const encrypted = service.encryptText('mysql-password-123');
    const envelope = JSON.parse(Buffer.from(encrypted, 'base64').toString('utf8'));

    expect(encrypted).not.toContain('mysql-password-123');
    expect(envelope).toEqual(expect.objectContaining({ v: 1, alg: 'aes-256-gcm' }));
    expect(envelope.iv).toEqual(expect.any(String));
    expect(envelope.tag).toEqual(expect.any(String));
    expect(envelope.ct).toEqual(expect.any(String));
    expect(service.decryptText(encrypted)).toBe('mysql-password-123');
  });

  it('encrypts and decrypts JSON values', () => {
    const service = new SecretCryptoService();

    const encrypted = service.encryptJson({ password: 'secret', token: 'abc' });

    expect(encrypted).not.toContain('secret');
    expect(service.decryptJson(encrypted)).toEqual({ password: 'secret', token: 'abc' });
  });

  it('rejects tampered ciphertext', () => {
    const service = new SecretCryptoService();
    const encrypted = service.encryptText('secret');
    const tampered = encrypted.slice(0, -2) + 'xx';

    expect(() => service.decryptText(tampered)).toThrow();
  });
});
  • Step 2: Run test to verify RED

Run:

pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts

Expected: FAIL because secret_crypto.ts does not exist.

  • Step 3: Implement SecretCryptoService

Create packages/backend/src/modules/netaclaw/service/secret_crypto.ts:

import { Provide, Scope, ScopeEnum } from '@midwayjs/core';
import * as crypto from 'crypto';

@Provide()
@Scope(ScopeEnum.Singleton)
export class SecretCryptoService {
  private readonly algorithm = 'aes-256-gcm' as const;
  private readonly ivLength = 12;

  private deriveKey(): Buffer {
    const raw = process.env.NETA_SECRET_KEY || process.env.APP_SECRET;
    if (!raw) {
      throw new Error('NETA_SECRET_KEY or APP_SECRET is required for secret encryption');
    }
    return crypto.createHash('sha256').update(raw).digest();
  }

  encryptText(value: string): string {
    const iv = crypto.randomBytes(this.ivLength);
    const cipher = crypto.createCipheriv(this.algorithm, this.deriveKey(), iv);
    const encrypted = Buffer.concat([cipher.update(value, 'utf8'), cipher.final()]);
    const tag = cipher.getAuthTag();
    return Buffer.from(JSON.stringify({
      v: 1,
      alg: this.algorithm,
      iv: iv.toString('base64'),
      tag: tag.toString('base64'),
      ct: encrypted.toString('base64'),
    }), 'utf8').toString('base64');
  }

  decryptText(ciphertext: string): string {
    const envelope = JSON.parse(Buffer.from(ciphertext, 'base64').toString('utf8'));
    if (!envelope || envelope.v !== 1 || envelope.alg !== this.algorithm) {
      throw new Error('Unsupported ciphertext envelope');
    }
    const iv = Buffer.from(envelope.iv, 'base64');
    const tag = Buffer.from(envelope.tag, 'base64');
    const encrypted = Buffer.from(envelope.ct, 'base64');
    const decipher = crypto.createDecipheriv(this.algorithm, this.deriveKey(), iv);
    decipher.setAuthTag(tag);
    return Buffer.concat([decipher.update(encrypted), decipher.final()]).toString('utf8');
  }

  encryptJson(value: Record<string, string>): string {
    return this.encryptText(JSON.stringify(value));
  }

  decryptJson(ciphertext: string): Record<string, string> {
    const parsed = JSON.parse(this.decryptText(ciphertext));
    if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
      throw new Error('Invalid encrypted JSON payload');
    }
    return Object.fromEntries(Object.entries(parsed).map(([key, value]) => [key, String(value)]));
  }
}
  • Step 4: Run test to verify GREEN

Run:

pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts

Expected: PASS.

  • Step 5: Write failing entity export test

Modify packages/backend/test/entity_exports.test.ts:

import { entities } from '../src/entities.js';
import { NetaClawAgentSessionEntity } from '../src/modules/netaclaw/entity/agent_session.js';
import { NetaClawAgentSessionEntryEntity } from '../src/modules/netaclaw/entity/agent_session_entry.js';
import { NetaClawDataSourceEntity } from '../src/modules/netaclaw/entity/data_source.js';
import { NetaClawDataSourceQueryAuditEntity } from '../src/modules/netaclaw/entity/data_source_query_audit.js';

describe('entities exports', () => {
  it('includes SubagentSession entity', () => {
    const entityNames = entities
      .map(e => (typeof e === 'function' ? e.name : ''))
      .filter(Boolean);

    expect(entityNames.some(n => n.includes('SubagentSession'))).toBe(true);
  });

  it('includes session-tree MySQL entities', () => {
    expect(entities).toContain(NetaClawAgentSessionEntity);
    expect(entities).toContain(NetaClawAgentSessionEntryEntity);
  });

  it('includes MySQL question answering entities', () => {
    expect(entities).toContain(NetaClawDataSourceEntity);
    expect(entities).toContain(NetaClawDataSourceQueryAuditEntity);
  });
});
  • Step 6: Run test to verify RED

Run:

pnpm --filter @neta/backend test -- --runInBand test/entity_exports.test.ts

Expected: FAIL because entity files do not exist.

  • Step 7: Implement data source entities

Create packages/backend/src/modules/netaclaw/entity/data_source.ts:

import { BaseEntity } from '../../base/entity/base.js';
import { Column, Entity, Index } from 'typeorm';

export interface NetaClawDataSourceExtra {
  ssl?: boolean | Record<string, unknown>;
  connectTimeout?: number;
  queryTimeoutMs?: number;
  maxRows?: number;
  allowedTables?: string[];
  blockedTables?: string[];
  maskedColumns?: Record<string, 'hash' | 'partial' | 'redact'>;
  schemaVisibility?: 'allowed-only' | 'all-names-only';
  maxJoinTables?: number;
  poolConnectionLimit?: number;
}

@Entity('netaclaw_data_source')
export class NetaClawDataSourceEntity extends BaseEntity {
  @Index({ unique: true })
  @Column({ comment: '数据源唯一名', length: 100 })
  name: string;

  @Column({ comment: '显示名称', length: 200, nullable: true })
  label: string;

  @Index()
  @Column({ comment: '数据源类型', length: 20, default: 'mysql' })
  type: string;

  @Column({ comment: 'MySQL host', length: 255 })
  host: string;

  @Column({ comment: 'MySQL port', default: 3306 })
  port: number;

  @Column({ comment: '默认数据库', length: 128 })
  database: string;

  @Column({ comment: '连接用户名', length: 255 })
  username: string;

  @Column({ comment: '加密后的连接密码', type: 'text', nullable: true })
  passwordEncrypted: string;

  @Column({ comment: '是否只读', default: 1 })
  readonly: number;

  @Index()
  @Column({ comment: '状态 0=禁用 1=启用', default: 1 })
  status: number;

  @Column({ comment: '授权 Agent ID 列表', type: 'json', nullable: true })
  allowedAgentIds: number[];

  @Column({ comment: '扩展配置', type: 'json', nullable: true })
  extra: NetaClawDataSourceExtra;
}

Create packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts:

import { BaseEntity } from '../../base/entity/base.js';
import { Column, Entity, Index } from 'typeorm';

@Entity('netaclaw_data_source_query_audit')
export class NetaClawDataSourceQueryAuditEntity extends BaseEntity {
  @Index()
  @Column({ comment: '数据源 ID' })
  dataSourceId: number;

  @Index()
  @Column({ comment: 'Agent ID', nullable: true })
  agentId: number;

  @Index()
  @Column({ comment: '用户 ID', nullable: true })
  userId: number;

  @Column({ comment: '工具调用 ID', length: 100, nullable: true })
  toolCallId: string;

  @Column({ comment: 'SQL SHA-256', length: 64 })
  sqlHash: string;

  @Column({ comment: 'SQL 摘要', type: 'text', nullable: true })
  sqlPreview: string;

  @Index()
  @Column({ comment: '状态 success/rejected/failed', length: 20 })
  status: string;

  @Column({ comment: '拒绝原因', length: 100, nullable: true })
  rejectReason: string;

  @Column({ comment: '耗时 ms', nullable: true })
  elapsedMs: number;

  @Column({ comment: '返回行数', nullable: true })
  rowCount: number;

  @Column({ comment: '错误代码', length: 50, nullable: true })
  errorCode: string;
}

Modify packages/backend/src/entities.ts by adding imports after existing netaclaw imports:

import * as entity51 from './modules/netaclaw/entity/data_source';
import * as entity52 from './modules/netaclaw/entity/data_source_query_audit';

and append:

  ...Object.values(entity51),
  ...Object.values(entity52),
  • Step 8: Run entity and crypto tests to verify GREEN

Run:

pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/entity_exports.test.ts

Expected: PASS.

  • Step 9: Write failing data source projection test

Add packages/backend/test/data_source_projection.test.ts:

import { NetaClawDataSourceService } from '../src/modules/netaclaw/service/data_source.js';

describe('NetaClawDataSourceService projections', () => {
  const source = {
    id: 1,
    name: 'sales_prod',
    label: '销售库',
    type: 'mysql',
    host: '127.0.0.1',
    port: 3306,
    database: 'sales',
    username: 'root',
    passwordEncrypted: 'ciphertext',
    readonly: 1,
    status: 1,
    allowedAgentIds: [9],
    extra: { allowedTables: ['orders'] },
  } as any;

  it('admin projection hides passwords but keeps connection metadata', () => {
    const service = new NetaClawDataSourceService();

    expect(service.toAdminSafe(source)).toEqual(expect.objectContaining({
      name: 'sales_prod',
      host: '127.0.0.1',
      username: 'root',
      hasPassword: true,
    }));
    expect(service.toAdminSafe(source)).not.toHaveProperty('passwordEncrypted');
  });

  it('agent projection hides host username password and permission details', () => {
    const service = new NetaClawDataSourceService();

    expect(service.toAgentSummary(source)).toEqual({
      name: 'sales_prod',
      label: '销售库',
      database: 'sales',
      status: 1,
    });
  });
});
  • Step 10: Run projection test to verify RED

Run:

pnpm --filter @neta/backend test -- --runInBand test/data_source_projection.test.ts

Expected: FAIL because toAdminSafe and toAgentSummary do not exist.

  • Step 11: Implement safe projections

Modify the service code in this task:

export type AdminSafeDataSource = Omit<NetaClawDataSourceEntity, 'passwordEncrypted'> & {
  hasPassword: boolean;
};

export type AgentDataSourceSummary = Pick<NetaClawDataSourceEntity, 'name' | 'label' | 'database' | 'status'>;

Replace any existing unsafe projection helper with:

toAdminSafe(entity: NetaClawDataSourceEntity): AdminSafeDataSource {
  const { passwordEncrypted, ...rest } = entity as any;
  return { ...rest, hasPassword: !!passwordEncrypted };
}

toAgentSummary(entity: NetaClawDataSourceEntity): AgentDataSourceSummary {
  return {
    name: entity.name,
    label: entity.label,
    database: entity.database,
    status: entity.status,
  };
}

Update listAdminSafe() and listForAgent() to return the right projections:

async listAdminSafe(): Promise<AdminSafeDataSource[]> {
  const rows = await this.repo.find({ order: { id: 'DESC' } as any });
  return rows.map(row => this.toAdminSafe(row));
}

async listForAgent(agentId: number): Promise<AgentDataSourceSummary[]> {
  const rows = await this.repo.find({ where: { type: 'mysql', status: 1 } as any, order: { id: 'DESC' } as any });
  return rows
    .filter(row => Array.isArray(row.allowedAgentIds) && row.allowedAgentIds.includes(agentId))
    .map(row => this.toAgentSummary(row));
}

Update saveConfig() to return toAdminSafe(saved).

  • Step 12: Run tests to verify GREEN

Run:

pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/entity_exports.test.ts test/data_source_projection.test.ts

Expected: PASS.

  • Step 13: Commit
git add packages/backend/src/modules/netaclaw/entity/data_source.ts packages/backend/src/modules/netaclaw/entity/data_source_query_audit.ts packages/backend/src/modules/netaclaw/service/secret_crypto.ts packages/backend/src/entities.ts packages/backend/test/secret_crypto.test.ts packages/backend/test/entity_exports.test.ts packages/backend/test/data_source_projection.test.ts
git commit -m "feat: add mysql data source entities and secret crypto"

Task 2: SQL Guard

Files:

  • Create: packages/backend/src/modules/netaclaw/service/mysql_query.ts

  • Test: packages/backend/test/mysql_sql_guard.test.ts

  • Step 1: Write failing SQL guard tests

Add packages/backend/test/mysql_sql_guard.test.ts:

import { validateMysqlReadOnlySql } from '../src/modules/netaclaw/service/mysql_query.js';

describe('validateMysqlReadOnlySql', () => {
  const baseOptions = {
    allowedTables: ['orders', 'customers'],
    blockedTables: [],
    maxJoinTables: 4,
    maxRows: 200,
    maskedColumns: {},
  };

  it('accepts a SELECT with an explicit JOIN condition', () => {
    expect(validateMysqlReadOnlySql(
      'SELECT c.name, COUNT(*) FROM customers c JOIN orders o ON o.customer_id = c.id GROUP BY c.name LIMIT 20',
      baseOptions
    )).toEqual(expect.objectContaining({
      ok: true,
      tables: expect.arrayContaining(['customers', 'orders']),
    }));
  });

  it.each([
    'SHOW TABLES',
    'DESCRIBE orders',
    'EXPLAIN SELECT * FROM orders',
    'UPDATE orders SET amount = 1',
    'DELETE FROM orders',
    'DROP TABLE orders',
    'SELECT * FROM orders; SELECT * FROM customers',
    'SELECT * FROM orders -- comment',
    'SELECT * FROM orders # comment',
    'SELECT * FROM orders /* comment */',
    'WITH recent AS (SELECT * FROM orders) SELECT * FROM recent',
    'SELECT * FROM orders UNION SELECT * FROM customers',
    'SELECT @x := id FROM orders',
    'SELECT SLEEP(1)',
    'SELECT * INTO OUTFILE "/tmp/a" FROM orders',
  ])('rejects unsafe SQL: %s', sql => {
    const result = validateMysqlReadOnlySql(sql, baseOptions);
    expect(result.ok).toBe(false);
  });

  it('rejects unallowed tables', () => {
    const result = validateMysqlReadOnlySql('SELECT * FROM payments LIMIT 10', baseOptions);

    expect(result).toEqual(expect.objectContaining({
      ok: false,
      reason: 'table_not_allowed',
    }));
  });

  it('rejects blocked tables even when allowed', () => {
    const result = validateMysqlReadOnlySql('SELECT * FROM orders LIMIT 10', {
      allowedTables: ['orders'],
      blockedTables: ['orders'],
      maxJoinTables: 4,
    });

    expect(result).toEqual(expect.objectContaining({
      ok: false,
      reason: 'table_blocked',
    }));
  });

  it('rejects explicit joins without ON or USING', () => {
    const result = validateMysqlReadOnlySql(
      'SELECT * FROM customers JOIN orders LIMIT 10',
      baseOptions
    );

    expect(result).toEqual(expect.objectContaining({
      ok: false,
      reason: 'join_condition_required',
    }));
  });

  it('rejects more joined tables than allowed', () => {
    const result = validateMysqlReadOnlySql(
      'SELECT * FROM a JOIN b ON b.a_id = a.id JOIN c ON c.b_id = b.id LIMIT 10',
      { allowedTables: ['a', 'b', 'c'], blockedTables: [], maxJoinTables: 2, maxRows: 200, maskedColumns: {} }
    );

    expect(result).toEqual(expect.objectContaining({
      ok: false,
      reason: 'too_many_join_tables',
    }));
  });

  it('rejects a later JOIN that lacks its own ON or USING clause', () => {
    const result = validateMysqlReadOnlySql(
      'SELECT * FROM customers c JOIN orders o ON o.customer_id = c.id JOIN invoices i LIMIT 10',
      { allowedTables: ['customers', 'orders', 'invoices'], blockedTables: [], maxJoinTables: 4, maxRows: 200, maskedColumns: {} }
    );

    expect(result).toEqual(expect.objectContaining({
      ok: false,
      reason: 'join_condition_required',
    }));
  });

  it('rejects explicit LIMIT values above maxRows', () => {
    const result = validateMysqlReadOnlySql('SELECT * FROM orders LIMIT 10000', baseOptions);

    expect(result).toEqual(expect.objectContaining({
      ok: false,
      reason: 'limit_exceeds_max_rows',
    }));
  });

  it('rejects SQL that references masked columns', () => {
    const result = validateMysqlReadOnlySql('SELECT email AS e FROM customers LIMIT 10', {
      allowedTables: ['customers'],
      blockedTables: [],
      maxJoinTables: 4,
      maxRows: 200,
      maskedColumns: { 'customers.email': 'redact' },
    });

    expect(result).toEqual(expect.objectContaining({
      ok: false,
      reason: 'masked_column_denied',
    }));
  });
});
  • Step 2: Run test to verify RED

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_sql_guard.test.ts

Expected: FAIL because mysql_query.ts does not export the guard.

  • Step 3: Implement SQL guard

Create the initial packages/backend/src/modules/netaclaw/service/mysql_query.ts with these exports:

export type MysqlSqlGuardOptions = {
  allowedTables?: string[];
  blockedTables?: string[];
  maxJoinTables?: number;
  maxRows?: number;
  maskedColumns?: Record<string, 'hash' | 'partial' | 'redact'>;
};

export type MysqlSqlGuardResult =
  | { ok: true; tables: string[]; limitedSql: string }
  | { ok: false; reason: string; tables?: string[] };

const DENIED_PATTERNS: Array<[RegExp, string]> = [
  [/;/, 'multiple_statements_denied'],
  [/(--|#|\/\*|\*\/)/, 'comments_denied'],
  [/^\s*(show|describe|desc|explain)\b/i, 'metadata_sql_denied'],
  [/^\s*(insert|update|delete|replace|create|alter|drop|truncate|call|grant|revoke|start|begin|commit|rollback)\b/i, 'non_select_denied'],
  [/\bunion\b/i, 'union_denied'],
  [/^\s*with\b/i, 'cte_denied'],
  [/@[a-zA-Z0-9_]+/, 'user_variable_denied'],
  [/\btemporary\b/i, 'temporary_table_denied'],
  [/\b(load_file|sleep|benchmark)\s*\(/i, 'dangerous_function_denied'],
  [/\binto\s+(out|dump)?file\b/i, 'file_output_denied'],
  [/\bfrom\s+[^,\s]+(?:\s+\w+)?\s*,\s*[^,\s]+/i, 'implicit_join_denied'],
];

function normalizeIdentifier(value: string): string {
  return value.replace(/`/g, '').split('.').pop()!.toLowerCase();
}

function stripBackticks(value: string): string {
  return value.replace(/`/g, '').toLowerCase();
}

export function extractMysqlTables(sql: string): string[] {
  const tables = new Set<string>();
  const tablePattern = /\b(?:from|join)\s+(`?[a-zA-Z0-9_]+`?(?:\.`?[a-zA-Z0-9_]+`?)?)/gi;
  let match: RegExpExecArray | null;
  while ((match = tablePattern.exec(sql))) {
    tables.add(normalizeIdentifier(match[1]));
  }
  return [...tables];
}

export function validateMysqlReadOnlySql(sql: string, options: MysqlSqlGuardOptions = {}): MysqlSqlGuardResult {
  const trimmed = String(sql || '').trim();
  if (!trimmed) return { ok: false, reason: 'sql_empty' };
  if (!/^\s*select\b/i.test(trimmed)) return { ok: false, reason: 'only_select_allowed' };

  for (const [pattern, reason] of DENIED_PATTERNS) {
    if (pattern.test(trimmed)) return { ok: false, reason };
  }

  if (!everyJoinHasCondition(trimmed)) {
    return { ok: false, reason: 'join_condition_required' };
  }

  const tables = extractMysqlTables(trimmed);
  const allowed = new Set((options.allowedTables || []).map(item => normalizeIdentifier(item)));
  const blocked = new Set((options.blockedTables || []).map(item => normalizeIdentifier(item)));

  for (const table of tables) {
    if (blocked.has(table)) return { ok: false, reason: 'table_blocked', tables };
    if (allowed.size === 0 || !allowed.has(table)) return { ok: false, reason: 'table_not_allowed', tables };
  }

  const maxJoinTables = Math.max(1, Math.min(options.maxJoinTables ?? 4, 6));
  if (tables.length > maxJoinTables) {
    return { ok: false, reason: 'too_many_join_tables', tables };
  }

  const maxRows = Math.max(1, Math.min(options.maxRows ?? 200, 1000));
  const explicitLimit = /\blimit\s+(\d+)/i.exec(trimmed);
  if (explicitLimit && Number(explicitLimit[1]) > maxRows) {
    return { ok: false, reason: 'limit_exceeds_max_rows', tables };
  }

  const normalizedSql = stripBackticks(trimmed);
  for (const key of Object.keys(options.maskedColumns || {})) {
    const [table, column] = key.toLowerCase().split('.');
    if (tables.includes(table) && new RegExp(`\\b${column}\\b`, 'i').test(normalizedSql)) {
      return { ok: false, reason: 'masked_column_denied', tables };
    }
  }

  const limitedSql = /\blimit\s+\d+/i.test(trimmed)
    ? trimmed
    : `SELECT * FROM (${trimmed}) AS neta_limited_query LIMIT ?`;

  return { ok: true, tables, limitedSql };
}

function everyJoinHasCondition(sql: string): boolean {
  const joinPattern = /\bjoin\b/gi;
  let match: RegExpExecArray | null;
  while ((match = joinPattern.exec(sql))) {
    const rest = sql.slice(match.index + match[0].length);
    const boundary = /\b(join|where|group\s+by|order\s+by|limit)\b/i.exec(rest);
    const segment = boundary ? rest.slice(0, boundary.index) : rest;
    if (!/\b(on|using)\b/i.test(segment)) return false;
  }
  return true;
}
  • Step 4: Run test to verify GREEN

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_sql_guard.test.ts

Expected: PASS.

  • Step 5: Commit
git add packages/backend/src/modules/netaclaw/service/mysql_query.ts packages/backend/test/mysql_sql_guard.test.ts
git commit -m "feat: add mysql read only sql guard"

Task 3: Data Source Service And Pool Manager

Files:

  • Create: packages/backend/src/modules/netaclaw/service/data_source.ts

  • Create: packages/backend/src/modules/netaclaw/service/mysql_pool.ts

  • Modify: packages/backend/src/modules/netaclaw/service/mysql_query.ts

  • Test: packages/backend/test/mysql_pool_manager.test.ts

  • Step 1: Write failing pool manager tests

Add packages/backend/test/mysql_pool_manager.test.ts:

import { MysqlPoolManager } from '../src/modules/netaclaw/service/mysql_pool.js';

describe('MysqlPoolManager', () => {
  it('reuses pools by data source id and clamps connectionLimit', async () => {
    const created: any[] = [];
    const createPool = jest.fn((config: any) => {
      const pool = { config, end: jest.fn().mockResolvedValue(undefined) };
      created.push(pool);
      return pool;
    });
    const manager = new MysqlPoolManager(createPool as any);
    const source: any = {
      id: 7,
      host: 'localhost',
      port: 3306,
      database: 'sales',
      username: 'root',
      passwordEncrypted: 'encrypted',
      extra: { poolConnectionLimit: 20, connectTimeout: 1000 },
    };

    manager.secretCrypto = { decryptText: jest.fn().mockReturnValue('plain') } as any;

    const first = await manager.getPool(source);
    const second = await manager.getPool(source);

    expect(first).toBe(second);
    expect(createPool).toHaveBeenCalledTimes(1);
    expect(created[0].config.connectionLimit).toBe(10);
    expect(created[0].config.password).toBe('plain');
  });

  it('closes and forgets a cached pool', async () => {
    const end = jest.fn().mockResolvedValue(undefined);
    const createPool = jest.fn(() => ({ end }));
    const manager = new MysqlPoolManager(createPool as any);
    manager.secretCrypto = { decryptText: jest.fn().mockReturnValue('plain') } as any;

    await manager.getPool({ id: 1, host: 'h', port: 3306, database: 'd', username: 'u', passwordEncrypted: 'p', extra: {} } as any);
    await manager.closePool(1);

    expect(end).toHaveBeenCalledTimes(1);
  });
});
  • Step 2: Run test to verify RED

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts

Expected: FAIL because mysql_pool.ts does not exist.

  • Step 3: Implement pool manager

Create packages/backend/src/modules/netaclaw/service/mysql_pool.ts:

import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
import mysql from 'mysql2/promise';
import type { Pool } from 'mysql2/promise';
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
import { SecretCryptoService } from './secret_crypto.js';

type CreatePool = typeof mysql.createPool;

@Provide()
@Scope(ScopeEnum.Singleton)
export class MysqlPoolManager {
  @Inject()
  secretCrypto: SecretCryptoService;

  private pools = new Map<number, Pool>();
  private readonly createPool: CreatePool;

  constructor(createPool: CreatePool = mysql.createPool) {
    this.createPool = createPool;
  }

  async getPool(source: NetaClawDataSourceEntity): Promise<Pool> {
    const existing = this.pools.get(source.id);
    if (existing) return existing;

    const limit = Math.max(1, Math.min(Number(source.extra?.poolConnectionLimit ?? 3), 10));
    const pool = this.createPool({
      host: source.host,
      port: source.port || 3306,
      database: source.database,
      user: source.username,
      password: source.passwordEncrypted ? this.secretCrypto.decryptText(source.passwordEncrypted) : undefined,
      waitForConnections: true,
      connectionLimit: limit,
      connectTimeout: Math.max(1000, Number(source.extra?.connectTimeout ?? 10000)),
      ssl: source.extra?.ssl as any,
    });
    this.pools.set(source.id, pool);
    return pool;
  }

  async closePool(sourceId: number): Promise<void> {
    const pool = this.pools.get(sourceId);
    if (!pool) return;
    this.pools.delete(sourceId);
    await pool.end();
  }

  createTransientPool(source: NetaClawDataSourceEntity): Pool {
    const limit = Math.max(1, Math.min(Number(source.extra?.poolConnectionLimit ?? 1), 3));
    return this.createPool({
      host: source.host,
      port: source.port || 3306,
      database: source.database,
      user: source.username,
      password: source.passwordEncrypted ? this.secretCrypto.decryptText(source.passwordEncrypted) : undefined,
      waitForConnections: true,
      connectionLimit: limit,
      connectTimeout: Math.max(1000, Number(source.extra?.connectTimeout ?? 10000)),
      ssl: source.extra?.ssl as any,
    });
  }
}
  • Step 4: Run pool tests to verify GREEN

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts

Expected: PASS.

  • Step 5: Implement DataSourceService

Create packages/backend/src/modules/netaclaw/service/data_source.ts:

import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
import { InjectEntityModel } from '@midwayjs/typeorm';
import { Repository } from 'typeorm';
import { NetaClawDataSourceEntity, NetaClawDataSourceExtra } from '../entity/data_source.js';
import { SecretCryptoService } from './secret_crypto.js';
import { MysqlPoolManager } from './mysql_pool.js';

export type AdminSafeDataSource = Omit<NetaClawDataSourceEntity, 'passwordEncrypted'> & {
  hasPassword: boolean;
};

export type AgentDataSourceSummary = Pick<NetaClawDataSourceEntity, 'name' | 'label' | 'database' | 'status'>;

export type SaveDataSourceInput = Partial<NetaClawDataSourceEntity> & {
  password?: string;
};

@Provide()
@Scope(ScopeEnum.Singleton)
export class NetaClawDataSourceService {
  @InjectEntityModel(NetaClawDataSourceEntity)
  repo: Repository<NetaClawDataSourceEntity>;

  @Inject()
  secretCrypto: SecretCryptoService;

  @Inject()
  mysqlPoolManager: MysqlPoolManager;

  toAdminSafe(entity: NetaClawDataSourceEntity): AdminSafeDataSource {
    const { passwordEncrypted, ...rest } = entity as any;
    return { ...rest, hasPassword: !!passwordEncrypted };
  }

  toAgentSummary(entity: NetaClawDataSourceEntity): AgentDataSourceSummary {
    return {
      name: entity.name,
      label: entity.label,
      database: entity.database,
      status: entity.status,
    };
  }

  async listAdminSafe(): Promise<AdminSafeDataSource[]> {
    const rows = await this.repo.find({ order: { id: 'DESC' } as any });
    return rows.map(row => this.toAdminSafe(row));
  }

  async listForAgent(agentId: number): Promise<AgentDataSourceSummary[]> {
    const rows = await this.repo.find({ where: { type: 'mysql', status: 1 } as any, order: { id: 'DESC' } as any });
    return rows
      .filter(row => Array.isArray(row.allowedAgentIds) && row.allowedAgentIds.includes(agentId))
      .map(row => this.toAgentSummary(row));
  }

  async getAuthorizedSource(name: string, agentId: number): Promise<NetaClawDataSourceEntity> {
    const source = await this.repo.findOne({ where: { name, type: 'mysql', status: 1 } as any });
    if (!source) throw new Error('data_source_not_found');
    if (!Array.isArray(source.allowedAgentIds) || !source.allowedAgentIds.includes(agentId)) {
      throw new Error('data_source_not_authorized');
    }
    return source;
  }

  normalizeExtra(extra?: NetaClawDataSourceExtra | null): NetaClawDataSourceExtra {
    return {
      allowedTables: extra?.allowedTables ?? [],
      blockedTables: extra?.blockedTables ?? [],
      maskedColumns: extra?.maskedColumns ?? {},
      schemaVisibility: extra?.schemaVisibility ?? 'allowed-only',
      queryTimeoutMs: Math.max(1000, Math.min(Number(extra?.queryTimeoutMs ?? 10000), 30000)),
      maxRows: Math.max(1, Math.min(Number(extra?.maxRows ?? 200), 1000)),
      maxJoinTables: Math.max(1, Math.min(Number(extra?.maxJoinTables ?? 4), 6)),
      poolConnectionLimit: Math.max(1, Math.min(Number(extra?.poolConnectionLimit ?? 3), 10)),
      ssl: extra?.ssl,
      connectTimeout: extra?.connectTimeout,
    };
  }

  async saveConfig(input: SaveDataSourceInput): Promise<AdminSafeDataSource> {
    const existing = input.id ? await this.repo.findOneBy({ id: input.id }) : null;
    const entity = this.repo.create({
      ...(existing || {}),
      ...input,
      type: 'mysql',
      readonly: 1,
      status: input.status ?? existing?.status ?? 1,
      port: Number(input.port ?? existing?.port ?? 3306),
      allowedAgentIds: Array.isArray(input.allowedAgentIds) ? input.allowedAgentIds : existing?.allowedAgentIds ?? [],
      extra: this.normalizeExtra(input.extra),
    });
    if (input.password) {
      entity.passwordEncrypted = this.secretCrypto.encryptText(input.password);
    }
    const saved = await this.repo.save(entity);
    await this.mysqlPoolManager.closePool(saved.id);
    return this.toAdminSafe(saved);
  }

  async delete(id: number): Promise<void> {
    await this.repo.delete(id);
    await this.mysqlPoolManager.closePool(id);
  }

  async testConnection(input: SaveDataSourceInput): Promise<{ ok: boolean; error?: string }> {
    const existing = input.id ? await this.repo.findOneBy({ id: input.id }) : null;
    const source = this.repo.create({
      ...(existing || {}),
      ...input,
      id: input.id ?? -1,
      type: 'mysql',
      readonly: 1,
      port: Number(input.port ?? 3306),
      allowedAgentIds: Array.isArray(input.allowedAgentIds) ? input.allowedAgentIds : [],
      extra: this.normalizeExtra(input.extra),
      passwordEncrypted: input.password
        ? this.secretCrypto.encryptText(input.password)
        : input.passwordEncrypted ?? existing?.passwordEncrypted,
    } as any);
    let pool: any;
    try {
      pool = this.mysqlPoolManager.createTransientPool(source);
      await pool.query('SELECT 1 AS ok');
      return { ok: true };
    } catch (err: any) {
      return { ok: false, error: sanitizeMysqlConnectionError(err) };
    } finally {
      if (pool) await pool.end();
    }
  }
}

function sanitizeMysqlConnectionError(err: any): string {
  return err?.code || err?.sqlState || err?.errno ? `mysql_connection_failed:${err.code || err.sqlState || err.errno}` : 'mysql_connection_failed';
}
  • Step 6: Run focused tests

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_pool_manager.test.ts test/secret_crypto.test.ts

Expected: PASS.

  • Step 7: Commit
git add packages/backend/src/modules/netaclaw/service/data_source.ts packages/backend/src/modules/netaclaw/service/mysql_pool.ts packages/backend/test/mysql_pool_manager.test.ts
git commit -m "feat: add mysql data source services"

Task 4: Schema Mapping And Query Execution

Files:

  • Create: packages/backend/src/modules/netaclaw/service/mysql_schema.ts

  • Modify: packages/backend/src/modules/netaclaw/service/mysql_query.ts

  • Test: packages/backend/test/mysql_schema_mapper.test.ts

  • Test: packages/backend/test/mysql_query_service.test.ts

  • Step 1: Write failing schema mapper test

Add packages/backend/test/mysql_schema_mapper.test.ts:

import { mapMysqlSchemaRows } from '../src/modules/netaclaw/service/mysql_schema.js';

describe('mapMysqlSchemaRows', () => {
  it('maps columns, primary keys, indexes and foreign keys', () => {
    const result = mapMysqlSchemaRows({
      columns: [
        { tableName: 'orders', tableComment: '订单', columnName: 'id', columnType: 'int', isNullable: 'NO', columnKey: 'PRI', columnComment: 'ID' },
        { tableName: 'orders', tableComment: '订单', columnName: 'customer_id', columnType: 'int', isNullable: 'NO', columnKey: 'MUL', columnComment: '客户' },
      ],
      indexes: [
        { tableName: 'orders', indexName: 'PRIMARY', columnName: 'id', nonUnique: 0, seqInIndex: 1 },
        { tableName: 'orders', indexName: 'idx_customer', columnName: 'customer_id', nonUnique: 1, seqInIndex: 1 },
      ],
      foreignKeys: [
        { tableName: 'orders', columnName: 'customer_id', constraintName: 'fk_orders_customer', referencedTableName: 'customers', referencedColumnName: 'id' },
      ],
      maskedColumns: { 'orders.customer_id': 'redact' },
    });

    expect(result.tables).toEqual([
      expect.objectContaining({
        name: 'orders',
        comment: '订单',
        primaryKey: ['id'],
        columns: [
          expect.objectContaining({ name: 'id', type: 'int', nullable: false, key: 'PRI', masked: false }),
          expect.objectContaining({ name: 'customer_id', type: 'int', nullable: false, key: 'MUL', masked: true, maskMode: 'redact' }),
        ],
        indexes: expect.arrayContaining([
          expect.objectContaining({ name: 'idx_customer', columns: ['customer_id'], unique: false }),
        ]),
        foreignKeys: [
          expect.objectContaining({ column: 'customer_id', referencedTable: 'customers', referencedColumn: 'id' }),
        ],
      }),
    ]);
  });
});
  • Step 2: Run schema mapper test to verify RED

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts

Expected: FAIL because mysql_schema.ts does not exist.

  • Step 3: Implement schema mapper and service skeleton

Create packages/backend/src/modules/netaclaw/service/mysql_schema.ts with:

import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
import { MysqlPoolManager } from './mysql_pool.js';

type MaskMode = 'hash' | 'partial' | 'redact';

export interface MysqlSchemaRows {
  columns: Array<{ tableName: string; tableComment?: string; columnName: string; columnType: string; isNullable: string; columnKey?: string; columnComment?: string }>;
  indexes: Array<{ tableName: string; indexName: string; columnName: string; nonUnique: number; seqInIndex: number }>;
  foreignKeys: Array<{ tableName: string; columnName: string; constraintName: string; referencedTableName: string; referencedColumnName: string }>;
  maskedColumns?: Record<string, MaskMode>;
}

export function mapMysqlSchemaRows(rows: MysqlSchemaRows) {
  const tableMap = new Map<string, any>();
  const getTable = (name: string, comment?: string) => {
    if (!tableMap.has(name)) {
      tableMap.set(name, { name, comment: comment || '', columns: [], primaryKey: [], indexes: [], foreignKeys: [] });
    }
    return tableMap.get(name);
  };

  for (const column of rows.columns) {
    const table = getTable(column.tableName, column.tableComment);
    const maskKey = `${column.tableName}.${column.columnName}`;
    const maskMode = rows.maskedColumns?.[maskKey];
    if (column.columnKey === 'PRI') table.primaryKey.push(column.columnName);
    table.columns.push({
      name: column.columnName,
      type: column.columnType,
      nullable: column.isNullable === 'YES',
      key: column.columnKey || '',
      comment: column.columnComment || '',
      masked: !!maskMode,
      maskMode: maskMode || null,
    });
  }

  const indexGroups = new Map<string, any>();
  for (const index of rows.indexes) {
    const key = `${index.tableName}.${index.indexName}`;
    if (!indexGroups.has(key)) {
      indexGroups.set(key, { tableName: index.tableName, name: index.indexName, unique: index.nonUnique === 0, columns: [] as string[] });
    }
    indexGroups.get(key).columns[index.seqInIndex - 1] = index.columnName;
  }
  for (const index of indexGroups.values()) {
    getTable(index.tableName).indexes.push({ name: index.name, unique: index.unique, columns: index.columns.filter(Boolean) });
  }

  for (const fk of rows.foreignKeys) {
    getTable(fk.tableName).foreignKeys.push({
      name: fk.constraintName,
      column: fk.columnName,
      referencedTable: fk.referencedTableName,
      referencedColumn: fk.referencedColumnName,
    });
  }

  return { tables: [...tableMap.values()] };
}

@Provide()
@Scope(ScopeEnum.Singleton)
export class MysqlIntrospectionService {
  @Inject()
  poolManager: MysqlPoolManager;

  async listSchema(source: NetaClawDataSourceEntity, options: { tables?: string[] } = {}) {
    const allowedTables = source.extra?.allowedTables || [];
    const requested = options.tables?.length ? options.tables : allowedTables;
    const visibleTables = requested.filter(table => allowedTables.includes(table) && !(source.extra?.blockedTables || []).includes(table));
    if (visibleTables.length === 0) return { tables: [] };

    const pool = await this.poolManager.getPool(source);
    const placeholders = visibleTables.map(() => '?').join(',');
    const [columns] = await pool.query(
      `SELECT TABLE_NAME AS tableName, '' AS tableComment, COLUMN_NAME AS columnName, COLUMN_TYPE AS columnType, IS_NULLABLE AS isNullable, COLUMN_KEY AS columnKey, COLUMN_COMMENT AS columnComment
       FROM information_schema.COLUMNS
       WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders})
       ORDER BY TABLE_NAME, ORDINAL_POSITION`,
      [source.database, ...visibleTables]
    );
    const [indexes] = await pool.query(
      `SELECT TABLE_NAME AS tableName, INDEX_NAME AS indexName, COLUMN_NAME AS columnName, NON_UNIQUE AS nonUnique, SEQ_IN_INDEX AS seqInIndex
       FROM information_schema.STATISTICS
       WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders})
       ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX`,
      [source.database, ...visibleTables]
    );
    const [foreignKeys] = await pool.query(
      `SELECT TABLE_NAME AS tableName, COLUMN_NAME AS columnName, CONSTRAINT_NAME AS constraintName, REFERENCED_TABLE_NAME AS referencedTableName, REFERENCED_COLUMN_NAME AS referencedColumnName
       FROM information_schema.KEY_COLUMN_USAGE
       WHERE TABLE_SCHEMA = ? AND TABLE_NAME IN (${placeholders}) AND REFERENCED_TABLE_NAME IS NOT NULL`,
      [source.database, ...visibleTables]
    );
    return mapMysqlSchemaRows({
      columns: columns as MysqlSchemaRows['columns'],
      indexes: indexes as MysqlSchemaRows['indexes'],
      foreignKeys: foreignKeys as MysqlSchemaRows['foreignKeys'],
      maskedColumns: source.extra?.maskedColumns,
    });
  }

  async sampleTable(source: NetaClawDataSourceEntity, options: { table: string; columns?: string[]; limit?: number }) {
    const allowedTables = source.extra?.allowedTables || [];
    const blockedTables = source.extra?.blockedTables || [];
    if (!allowedTables.includes(options.table) || blockedTables.includes(options.table)) {
      throw new Error('table_not_allowed');
    }
    const schema = await this.listSchema(source, { tables: [options.table] });
    const table = schema.tables[0];
    if (!table) throw new Error('table_not_found');
    const visibleColumns = new Set(table.columns.map((column: any) => column.name));
    const requestedColumns = options.columns?.length ? options.columns : table.columns.map((column: any) => column.name);
    for (const column of requestedColumns) {
      if (!/^[A-Za-z0-9_]+$/.test(column) || !visibleColumns.has(column)) {
        throw new Error('column_not_allowed');
      }
      if (source.extra?.maskedColumns?.[`${options.table}.${column}`]) {
        throw new Error('masked_column_denied');
      }
    }
    const safeTable = `\`${options.table.replace(/`/g, '')}\``;
    const safeColumns = requestedColumns.map(column => `\`${column.replace(/`/g, '')}\``).join(', ');
    const limit = Math.max(1, Math.min(Number(options.limit ?? 5), 20));
    const pool = await this.poolManager.getPool(source);
    const [rows, fields] = await pool.query(`SELECT ${safeColumns} FROM ${safeTable} LIMIT ?`, [limit]);
    return {
      columns: Array.isArray(fields) ? fields.map((field: any) => field.name) : requestedColumns,
      rows: Array.isArray(rows) ? rows : [],
      rowCount: Array.isArray(rows) ? rows.length : 0,
      truncated: false,
    };
  }
}
  • Step 4: Run schema mapper test to verify GREEN

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts

Expected: PASS.

  • Step 5: Write failing query service test

Add packages/backend/test/mysql_query_service.test.ts:

import { MysqlQueryService } from '../src/modules/netaclaw/service/mysql_query.js';

describe('MysqlQueryService', () => {
  it('executes a guarded query and writes audit', async () => {
    const query = jest.fn().mockResolvedValue([
      [
        { id: 1, total: 99 },
      ],
      [{ name: 'id' }, { name: 'total' }],
    ]);
    const service = new MysqlQueryService();
    service.poolManager = { getPool: jest.fn().mockResolvedValue({ query }) } as any;
    service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;

    const result = await service.executeReadOnly({
      source: {
        id: 1,
        database: 'sales',
        extra: {
          allowedTables: ['orders'],
          blockedTables: [],
          maskedColumns: {},
          maxRows: 200,
          maxJoinTables: 4,
          queryTimeoutMs: 10000,
        },
      } as any,
      sql: 'SELECT id, total FROM orders LIMIT 10',
      agentId: 3,
      userId: 5,
      toolCallId: 'tool-1',
    });

    expect(result.rows).toEqual([{ id: 1, total: 99 }]);
    expect(result.rowCount).toBe(1);
    expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
      dataSourceId: 1,
      agentId: 3,
      userId: 5,
      toolCallId: 'tool-1',
      status: 'success',
      rowCount: 1,
    }));
  });

  it('rejects masked column references before querying mysql', async () => {
    const service = new MysqlQueryService();
    service.poolManager = { getPool: jest.fn() } as any;
    service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;

    await expect(service.executeReadOnly({
      source: {
        id: 1,
        extra: {
          allowedTables: ['orders'],
          blockedTables: [],
          maskedColumns: { 'orders.email': 'redact' },
          maxRows: 200,
          maxJoinTables: 4,
        },
      } as any,
      sql: 'SELECT email AS e FROM orders LIMIT 10',
      agentId: 3,
      userId: 5,
      toolCallId: 'tool-masked',
    })).rejects.toThrow('mysql_sql_rejected');

    expect(service.poolManager.getPool).not.toHaveBeenCalled();
    expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
      status: 'rejected',
      rejectReason: 'masked_column_denied',
    }));
  });

  it('audits rejected SQL', async () => {
    const service = new MysqlQueryService();
    service.poolManager = { getPool: jest.fn() } as any;
    service.auditRepo = { save: jest.fn().mockResolvedValue({}) } as any;

    await expect(service.executeReadOnly({
      source: { id: 1, extra: { allowedTables: ['orders'], blockedTables: [] } } as any,
      sql: 'DROP TABLE orders',
      agentId: 3,
      userId: 5,
      toolCallId: 'tool-2',
    })).rejects.toThrow('mysql_sql_rejected');

    expect(service.poolManager.getPool).not.toHaveBeenCalled();
    expect(service.auditRepo.save).toHaveBeenCalledWith(expect.objectContaining({
      status: 'rejected',
    }));
  });
});
  • Step 6: Run query service test to verify RED

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_query_service.test.ts

Expected: FAIL because MysqlQueryService is not implemented.

  • Step 7: Implement query service

Extend packages/backend/src/modules/netaclaw/service/mysql_query.ts with:

import { Inject, Provide, Scope, ScopeEnum } from '@midwayjs/core';
import { InjectEntityModel } from '@midwayjs/typeorm';
import { Repository } from 'typeorm';
import * as crypto from 'crypto';
import { NetaClawDataSourceEntity } from '../entity/data_source.js';
import { NetaClawDataSourceQueryAuditEntity } from '../entity/data_source_query_audit.js';
import { MysqlPoolManager } from './mysql_pool.js';

export interface ExecuteReadOnlyParams {
  source: NetaClawDataSourceEntity;
  sql: string;
  agentId?: number;
  userId?: number;
  toolCallId?: string;
}

function hashSql(sql: string): string {
  return crypto.createHash('sha256').update(sql).digest('hex');
}

function sanitizeMysqlError(err: any): string {
  const code = err?.code || err?.sqlState || err?.errno;
  return code ? `mysql_query_failed:${code}` : 'mysql_query_failed';
}

@Provide()
@Scope(ScopeEnum.Singleton)
export class MysqlQueryService {
  @Inject()
  poolManager: MysqlPoolManager;

  @InjectEntityModel(NetaClawDataSourceQueryAuditEntity)
  auditRepo: Repository<NetaClawDataSourceQueryAuditEntity>;

  async executeReadOnly(params: ExecuteReadOnlyParams) {
    const start = Date.now();
    const extra = params.source.extra || {};
    const guard = validateMysqlReadOnlySql(params.sql, {
      allowedTables: extra.allowedTables || [],
      blockedTables: extra.blockedTables || [],
      maxJoinTables: extra.maxJoinTables || 4,
      maxRows: extra.maxRows || 200,
      maskedColumns: extra.maskedColumns || {},
    });

    if (!guard.ok) {
      await this.writeAudit(params, 'rejected', start, 0, guard.reason);
      throw new Error(`mysql_sql_rejected: ${guard.reason}`);
    }

    try {
      const pool = await this.poolManager.getPool(params.source);
      const maxRows = Math.max(1, Math.min(Number(extra.maxRows ?? 200), 1000));
      const appendedLimit = guard.limitedSql.includes('?');
      const sql = guard.limitedSql.includes('?') ? guard.limitedSql : guard.limitedSql;
      const sqlParams = appendedLimit ? [maxRows + 1] : [];
      const [rows, fields] = await pool.query({ sql, timeout: Math.max(1000, Math.min(Number(extra.queryTimeoutMs ?? 10000), 30000)) } as any, sqlParams);
      const fetchedRows = Array.isArray(rows) ? rows as any[] : [];
      const truncated = appendedLimit && fetchedRows.length > maxRows;
      const resultRows = truncated ? fetchedRows.slice(0, Math.min(fetchedRows.length, maxRows)) : fetchedRows;
      await this.writeAudit(params, 'success', start, resultRows.length, null);
      return {
        columns: Array.isArray(fields) ? fields.map((field: any) => field.name) : Object.keys(resultRows[0] || {}),
        rows: resultRows,
        rowCount: resultRows.length,
        truncated,
        elapsedMs: Date.now() - start,
        sql: params.sql,
      };
    } catch (err: any) {
      await this.writeAudit(params, 'failed', start, 0, null, err?.code || err?.errno || err?.sqlState);
      throw new Error(sanitizeMysqlError(err));
    }
  }

  private async writeAudit(
    params: ExecuteReadOnlyParams,
    status: 'success' | 'rejected' | 'failed',
    start: number,
    rowCount: number,
    rejectReason?: string | null,
    errorCode?: string | number | null
  ) {
    await this.auditRepo.save({
      dataSourceId: params.source.id,
      agentId: params.agentId ?? null,
      userId: params.userId ?? null,
      toolCallId: params.toolCallId ?? null,
      sqlHash: hashSql(params.sql),
      sqlPreview: params.sql.slice(0, 1000),
      status,
      rejectReason: rejectReason ?? null,
      elapsedMs: Date.now() - start,
      rowCount,
      errorCode: errorCode ? String(errorCode) : null,
    } as any);
  }
}
  • Step 8: Run schema/query tests

Run:

pnpm --filter @neta/backend test -- --runInBand test/mysql_schema_mapper.test.ts test/mysql_query_service.test.ts test/mysql_sql_guard.test.ts

Expected: PASS.

  • Step 9: Commit
git add packages/backend/src/modules/netaclaw/service/mysql_schema.ts packages/backend/src/modules/netaclaw/service/mysql_query.ts packages/backend/test/mysql_schema_mapper.test.ts packages/backend/test/mysql_query_service.test.ts
git commit -m "feat: add mysql schema and query services"

Task 5: MySQL Agent Tools And Resolver Wiring

Files:

  • Create: packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts

  • Modify: packages/backend/src/modules/netaclaw/tools/catalog.ts

  • Modify: packages/backend/src/modules/netaclaw/tools/manifest.ts

  • Modify: packages/backend/src/modules/netaclaw/service/tool_resolver.ts

  • Test: packages/backend/test/tool_resolver.test.ts

  • Step 1: Write failing resolver tests

Append to packages/backend/test/tool_resolver.test.ts:

  it('resolves mysql toolset tools when enabled for an agent', async () => {
    service.toolRegistry = {
      all: jest.fn().mockResolvedValue([
        { name: 'mysql_list_sources', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
        { name: 'mysql_schema', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
        { name: 'mysql_table_sample', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
        { name: 'mysql_query', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
      ]),
    } as any;
    service.skillLoader = {} as any;
    service.dataSourceService = { listForAgent: jest.fn().mockResolvedValue([]) } as any;
    service.mysqlIntrospection = {} as any;
    service.mysqlQuery = {} as any;

    const result = await service.resolve({
      agent: { id: 9, toolsets: ['mysql'], tools: { disabled: [] } } as any,
      modelCapability: 'text',
    });

    expect(result.toolNames).toEqual(['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']);
    expect(result.tools.map(tool => tool.name)).toEqual(['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']);
  });

  it('routes mysql tools through main process proxy for subagent workers', async () => {
    service.toolRegistry = {
      all: jest.fn().mockResolvedValue([
        { name: 'mysql_query', status: 1, capability: 'text', promptHint: null, isCore: 0, toolset: 'mysql' },
      ]),
    } as any;
    service.skillLoader = {} as any;
    service.dataSourceService = {} as any;
    service.mysqlIntrospection = {} as any;
    service.mysqlQuery = {} as any;

    const result = await service.resolve({
      agent: {
        tools: { inheritCoreTools: false, enabled: ['mysql_query'], disabled: [] },
      } as any,
      modelCapability: 'text',
      delegationRole: 'subagent',
    });

    expect(result.toolManifest).toEqual([
      expect.objectContaining({
        name: 'mysql_query',
        supportedInWorker: false,
        workerRoutingHint: 'main-process-proxy',
      }),
    ]);
  });
  • Step 2: Run resolver test to verify RED

Run:

pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts

Expected: FAIL because mysql tools are not registered or injected.

  • Step 3: Implement MySQL tools

Create packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts:

import { Type } from '@sinclair/typebox';
import { AgentToolWithMeta, jsonResult } from '../common.js';
import { registerSchema } from '../catalog.js';
import { NetaClawDataSourceService } from '../../service/data_source.js';
import { MysqlIntrospectionService } from '../../service/mysql_schema.js';
import { MysqlQueryService } from '../../service/mysql_query.js';

export const TOOLSET_MYSQL = 'mysql' as const;

const Runtime = Type.Optional(Type.Any());
const SourceParam = Type.Object({
  source: Type.String({ description: '数据源名称' }),
  _netaRuntime: Runtime,
});

function readAgentId(params: any): number {
  const id = params?._netaRuntime?.currentAgent?.id;
  if (!id) throw new Error('current_agent_id_missing');
  return Number(id);
}

function readUserId(params: any): number | undefined {
  const id = params?._netaRuntime?.userId;
  return id === undefined || id === null ? undefined : Number(id);
}

export function createMysqlListSourcesTool(deps: {
  dataSourceService: NetaClawDataSourceService;
}): AgentToolWithMeta<typeof Type.Object<any>, unknown> {
  const Params = Type.Object({ _netaRuntime: Runtime });
  return {
    name: 'mysql_list_sources',
    label: 'MySQL 数据源列表',
    description: '列出当前 Agent 授权可用的 MySQL 数据源,不返回连接密码。',
    parameters: Params,
    async execute(_id, params) {
      return jsonResult({ sources: await deps.dataSourceService.listForAgent(readAgentId(params)) });
    },
  } as any;
}

export function createMysqlSchemaTool(deps: {
  dataSourceService: NetaClawDataSourceService;
  mysqlIntrospection: MysqlIntrospectionService;
}): AgentToolWithMeta<any, unknown> {
  const Params = Type.Intersect([SourceParam, Type.Object({
    tables: Type.Optional(Type.Array(Type.String())),
  })]);
  return {
    name: 'mysql_schema',
    label: 'MySQL Schema',
    description: '读取授权 MySQL 表的字段、索引、主键和外键信息。',
    parameters: Params,
    async execute(_id, params) {
      const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
      return jsonResult(await deps.mysqlIntrospection.listSchema(source, { tables: params.tables }));
    },
  };
}

export function createMysqlTableSampleTool(deps: {
  dataSourceService: NetaClawDataSourceService;
  mysqlIntrospection: MysqlIntrospectionService;
}): AgentToolWithMeta<any, unknown> {
  const Params = Type.Intersect([SourceParam, Type.Object({
    table: Type.String(),
    columns: Type.Optional(Type.Array(Type.String())),
    limit: Type.Optional(Type.Number()),
  })]);
  return {
    name: 'mysql_table_sample',
    label: 'MySQL 表样例',
    description: '读取授权表的少量样例值,用于判断字段含义和 JOIN key。',
    parameters: Params,
    async execute(_id, params) {
      const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
      return jsonResult(await deps.mysqlIntrospection.sampleTable(source, {
        table: params.table,
        columns: params.columns,
        limit: params.limit,
      }));
    },
  };
}

export function createMysqlQueryTool(deps: {
  dataSourceService: NetaClawDataSourceService;
  mysqlQuery: MysqlQueryService;
}): AgentToolWithMeta<any, unknown> {
  const Params = Type.Intersect([SourceParam, Type.Object({
    sql: Type.String(),
  })]);
  return {
    name: 'mysql_query',
    label: 'MySQL 查询',
    description: '执行授权数据源上的只读 SELECT SQL。支持授权表范围内的跨表 JOIN。',
    parameters: Params,
    async execute(id, params) {
      const source = await deps.dataSourceService.getAuthorizedSource(params.source, readAgentId(params));
      return jsonResult(await deps.mysqlQuery.executeReadOnly({
        source,
        sql: params.sql,
        agentId: readAgentId(params),
        userId: readUserId(params),
        toolCallId: id,
      }));
    },
  };
}

for (const name of ['mysql_list_sources', 'mysql_schema', 'mysql_table_sample', 'mysql_query']) {
  registerSchema({
    name,
    toolset: TOOLSET_MYSQL,
    description: name,
    visibility: 'tool',
    capability: 'text',
    isCore: false,
    canDisable: true,
  });
}
  • Step 4: Wire catalog, manifest, resolver

Modify packages/backend/src/modules/netaclaw/tools/catalog.ts:

export const TOOLSET_MYSQL = 'mysql' as const;
import './builtin/mysql.js';

Modify packages/backend/src/modules/netaclaw/tools/manifest.ts:

const MAIN_PROCESS_PROXY_TOOLS = new Set([
  'write_file',
  'edit',
  'patch',
  'read_skill',
  'read_skill_file',
  'skill_manage',
  'memory_save',
  'memory_recall',
  'mysql_list_sources',
  'mysql_schema',
  'mysql_table_sample',
  'mysql_query',
]);

export function toToolKind(name: string): SubagentToolManifestItem['kind'] {
  if (name.startsWith('mysql_')) return 'custom';
  if (name.startsWith('memory_')) return 'memory';
  if (name.startsWith('read_skill') || name === 'skill_manage') return 'skill';
  if (name === 'delegate_task' || name === 'delegate_parallel') return 'delegation';
  if (name === 'escalate') return 'admin';
  if (
    name === 'bash'
    || name === 'read_file'
    || name === 'write_file'
    || name === 'list_dir'
    || name === 'find_files'
    || name === 'grep'
    || name === 'edit'
    || name === 'patch'
    || name === 'clarify'
  ) {
    return 'builtin';
  }
  return 'custom';
}

Modify packages/backend/src/modules/netaclaw/service/tool_resolver.ts:

  • Import services and factories:
import { NetaClawDataSourceService } from './data_source.js';
import { MysqlIntrospectionService } from './mysql_schema.js';
import { MysqlQueryService } from './mysql_query.js';
import {
  createMysqlListSourcesTool,
  createMysqlQueryTool,
  createMysqlSchemaTool,
  createMysqlTableSampleTool,
} from '../tools/builtin/mysql.js';
  • Add injected services:
@Inject()
dataSourceService: NetaClawDataSourceService;

@Inject()
mysqlIntrospection: MysqlIntrospectionService;

@Inject()
mysqlQuery: MysqlQueryService;
  • Add to getBaseToolMap():
['mysql_list_sources', createMysqlListSourcesTool({ dataSourceService: this.dataSourceService })],
['mysql_schema', createMysqlSchemaTool({ dataSourceService: this.dataSourceService, mysqlIntrospection: this.mysqlIntrospection })],
['mysql_table_sample', createMysqlTableSampleTool({ dataSourceService: this.dataSourceService, mysqlIntrospection: this.mysqlIntrospection })],
['mysql_query', createMysqlQueryTool({ dataSourceService: this.dataSourceService, mysqlQuery: this.mysqlQuery })],
  • Step 5: Run resolver tests

Run:

pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts

Expected: PASS.

  • Step 6: Commit
git add packages/backend/src/modules/netaclaw/tools/builtin/mysql.ts packages/backend/src/modules/netaclaw/tools/catalog.ts packages/backend/src/modules/netaclaw/tools/manifest.ts packages/backend/src/modules/netaclaw/service/tool_resolver.ts packages/backend/test/tool_resolver.test.ts
git commit -m "feat: register mysql agent tools"

Task 6: Admin Controller And Prompt Skill

Files:

  • Create: packages/backend/src/modules/netaclaw/controller/admin/data_source.ts

  • Create: packages/backend/skills/data-analyst-mysql/SKILL.md

  • Step 1: Implement admin controller

Create packages/backend/src/modules/netaclaw/controller/admin/data_source.ts:

import { Body, Controller, Get, Inject, Post, Provide, Query } from '@midwayjs/core';
import { NetaClawDataSourceService } from '../../service/data_source.js';

@Provide()
@Controller('/admin/netaclaw/data-source')
export class NetaClawDataSourceAdminController {
  @Inject()
  dataSourceService: NetaClawDataSourceService;

  @Get('/list')
  async list(@Query('agentId') agentId?: number) {
    if (agentId) {
      return { code: 1000, data: await this.dataSourceService.listForAgent(Number(agentId)) };
    }
    return { code: 1000, data: await this.dataSourceService.listAdminSafe() };
  }

  @Post('/save')
  async save(@Body() body: any) {
    return { code: 1000, data: await this.dataSourceService.saveConfig(body) };
  }

  @Post('/delete')
  async delete(@Body() body: { id: number }) {
    await this.dataSourceService.delete(Number(body.id));
    return { code: 1000, message: 'success' };
  }

  @Post('/test')
  async test(@Body() body: any) {
    const result = await this.dataSourceService.testConnection(body);
    return result.ok
      ? { code: 1000, data: result }
      : { code: 1001, message: result.error || 'connection_failed', data: result };
  }
}
  • Step 2: Add prompt skill

Create packages/backend/skills/data-analyst-mysql/SKILL.md:

---
name: data-analyst-mysql
description: 使用 MySQL 工具进行只读智能问数,支持 schema 分析、跨表 JOIN、口径澄清和 SQL 结果解释。
version: 1.0.0
metadata:
  skillType: llm
  tags: [mysql, data-analysis, question-answering]
  conditions:
    requires_tools: ["mysql_list_sources", "mysql_schema", "mysql_query"]
---

# MySQL 智能问数

你负责使用 MySQL 工具回答业务数据问题。所有数据库访问必须通过 `mysql_*` 工具完成。

## 工作流程

1. 先调用 `mysql_list_sources` 查看当前 Agent 可用的数据源。
2. 根据用户问题选择数据源;如果无法判断,先澄清。
3. 调用 `mysql_schema` 读取相关表结构、字段、主键、索引和外键。
4. 跨表 JOIN 时优先使用外键;没有外键时,根据字段名、类型、索引和 `mysql_table_sample` 的样例值推断关联。
5. 业务口径不明确时必须先澄清,不要硬猜。
6. 只调用 `mysql_query` 执行只读 SELECT。

## JOIN 规则

- JOIN 必须有明确的 `ON``USING` 条件。
- 不确定关联字段时,先解释拟采用的关联并询问用户。
- 若关联来自推断,最终回答必须说明“该关联为推断”。
- 查询超时或被拒绝时,缩小时间范围、减少表数量或向用户澄清。

## 回答格式

回答必须包含:

- 结论。
- SQL。
- 口径说明特别是时间范围、筛选条件、JOIN 字段。
- 限制说明,例如 LIMIT、样例、脱敏列、推断关联或字段缺失。

不要暴露数据库 host、账号、密码、连接串或未授权表信息。
不要承诺“全量统计”,除非 SQL 口径和过滤范围明确。
  • Step 3: Run build or focused tests

Run:

pnpm --filter @neta/backend test -- --runInBand test/tool_resolver.test.ts test/skill_env_schema.test.ts

Expected: PASS.

  • Step 4: Commit
git add packages/backend/src/modules/netaclaw/controller/admin/data_source.ts packages/backend/skills/data-analyst-mysql/SKILL.md
git commit -m "feat: add mysql data source admin api and analyst skill"

Task 7: Verification And Documentation Sync

Files:

  • Modify: docs/code-wiki/entities/tool-system.md

  • Modify: docs/code-wiki/entities/skill-system.md

  • Modify: docs/code-wiki/entities/netaclaw-module.md

  • Modify: docs/code-wiki/comparisons/database-entity-overview.md

  • Step 1: Update code wiki

Add concise references:

  • tool-system.md: add mysql_list_sources, mysql_schema, mysql_table_sample, mysql_query under a MySQL 问数 toolset.

  • skill-system.md: add data-analyst-mysql as a prompt skill.

  • netaclaw-module.md: increment table count and list netaclaw_data_source, netaclaw_data_source_query_audit.

  • database-entity-overview.md: add field summary for both new tables.

  • Step 2: Run all focused backend tests

Run:

pnpm --filter @neta/backend test -- --runInBand test/secret_crypto.test.ts test/mysql_sql_guard.test.ts test/mysql_schema_mapper.test.ts test/mysql_query_service.test.ts test/mysql_pool_manager.test.ts test/tool_resolver.test.ts test/entity_exports.test.ts

Expected: PASS.

  • Step 3: Run backend build

Run:

pnpm --filter @neta/backend build

Expected: exit code 0.

  • Step 4: Inspect final diff

Run:

git status --short
git diff --stat

Expected: only MySQL question answering implementation files and docs are changed.

  • Step 5: Commit
git add docs/code-wiki packages/backend
git commit -m "docs: update code wiki for mysql question answering"

Final Verification Checklist

  • SQL guard rejects SHOW, DESCRIBE, EXPLAIN, comments, semicolons, DML, DDL, UNION, CTE, user variables, dangerous functions, blocked tables, unallowed tables, missing JOIN condition, too many JOIN tables.
  • Data source password is AES-GCM encrypted; admin APIs never return password ciphertext/plaintext, and mysql_list_sources never returns host, username, password, connection string, or permission details.
  • mysql_schema returns only authorized table metadata and marks masked columns.
  • mysql_table_sample validates table and column identifiers against schema plus allowlist/blocklist, and rejects masked columns.
  • mysql_query rejects masked column references, explicit LIMIT above maxRows, missing per-JOIN ON/USING, blocked tables, and unallowed tables before querying MySQL.
  • mysql_query writes audit for success, rejected, and failed SQL.
  • MySQL tools are visible only when selected by mysql toolset or explicit tool config.
  • Subagent worker route for MySQL tools is main-process-proxy, not worker-local.
  • data-analyst-mysql skill is discoverable and condition-gated by MySQL tools.
  • Focused tests pass.
  • Backend build passes.