import * as fs from 'fs'; import * as os from 'os'; import * as path from 'path'; import { SubagentProcessRunner } from '../src/modules/netaclaw/subagent/process_runner.js'; import { SUBAGENT_WORKER_PROTOCOL_VERSION, SubagentTaskEnvelope, } from '../src/modules/netaclaw/subagent/process_protocol.js'; describe('SubagentProcessRunner', () => { const tempDirs: string[] = []; afterEach(() => { tempDirs.splice(0).forEach(dir => { fs.rmSync(dir, { recursive: true, force: true }); }); }); const writeWorker = (content: string): string => { const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'neta-subagent-worker-')); tempDirs.push(dir); const workerPath = path.join(dir, 'worker.js'); fs.writeFileSync(workerPath, content, 'utf8'); return workerPath; }; const createEnvelope = (overrides: Partial = {}): SubagentTaskEnvelope => ({ protocolVersion: SUBAGENT_WORKER_PROTOCOL_VERSION, runId: 'run-1', sessionId: 'session-1', parentMessageId: 12, parentToolCallId: 'tool-call-1', parentAgentId: 88, parentEntryId: 'entry-1', task: { goal: 'Summarize the repo', context: 'Focus on backend boundaries.', agentId: 77, agentName: 'researcher', }, agentConfig: { name: 'researcher', systemPrompt: 'You are isolated.', model: 'openai:gpt-5.4', apiKey: 'test-key', maxToolRounds: 3, }, toolNames: ['bash'], ...overrides, }); it('spawns a worker process, streams JSONL events, and resolves on run_end', async () => { const workerPath = writeWorker(` process.stdin.resume(); process.stdin.setEncoding('utf8'); let buffer = ''; process.stdin.on('data', chunk => { buffer += chunk; const index = buffer.indexOf('\\n'); if (index === -1) return; const envelope = JSON.parse(buffer.slice(0, index)); const timestamp = new Date().toISOString(); process.stdout.write(JSON.stringify({ type: 'run_start', runId: envelope.runId, timestamp }) + '\\n'); process.stdout.write(JSON.stringify({ type: 'token', runId: envelope.runId, timestamp, text: 'hello' }) + '\\n'); process.stdout.write(JSON.stringify({ type: 'run_end', runId: envelope.runId, timestamp, result: { finalContent: 'worker result', usage: { inputTokens: 3, outputTokens: 4 }, toolCallCount: 1 } }) + '\\n'); }); `); const events: string[] = []; const runner = new SubagentProcessRunner({ workerPath }); const resultPromise = runner.run(createEnvelope(), event => { events.push(event.type); }); expect(runner.getActiveRunIds()).toEqual(['run-1']); await expect(resultPromise).resolves.toEqual({ finalContent: 'worker result', usage: { inputTokens: 3, outputTokens: 4 }, toolCallCount: 1, }); expect(events).toEqual(['run_start', 'token', 'run_end']); expect(runner.getActiveRunIds()).toEqual([]); }); it('handles main-process proxy tool calls over worker stdin/stdout JSONL', async () => { const workerPath = writeWorker(` process.stdin.resume(); process.stdin.setEncoding('utf8'); let buffer = ''; let envelope = null; process.stdin.on('data', chunk => { buffer += chunk; const lines = buffer.split(/\\r?\\n/); buffer = lines.pop() || ''; for (const line of lines) { if (!line.trim()) continue; const message = JSON.parse(line); const timestamp = new Date().toISOString(); if (!envelope) { envelope = message; process.stdout.write(JSON.stringify({ type: 'proxy_tool_call', runId: envelope.runId, timestamp, requestId: 'proxy-1', name: 'write_file', toolCallId: 'tool-call-proxy', args: { path: '/tmp/example.txt', content: 'hello' } }) + '\\n'); continue; } process.stdout.write(JSON.stringify({ type: 'run_end', runId: envelope.runId, timestamp, result: { finalContent: message.result, usage: { inputTokens: 1, outputTokens: 2 }, toolCallCount: 1 } }) + '\\n'); } }); `); const events: string[] = []; const runner = new SubagentProcessRunner({ workerPath }); await expect(runner.run( createEnvelope(), event => events.push(event.type), async event => `proxied:${event.name}:${event.args.content}` )).resolves.toEqual({ finalContent: 'proxied:write_file:hello', usage: { inputTokens: 1, outputTokens: 2 }, toolCallCount: 1, }); expect(events).toEqual(['proxy_tool_call', 'run_end']); expect(runner.getActiveRunIds()).toEqual([]); }); it('rejects malformed worker JSONL output', async () => { const workerPath = writeWorker(` process.stdout.write('not-json\\n'); `); const runner = new SubagentProcessRunner({ workerPath }); await expect(runner.run(createEnvelope())).rejects.toThrow( 'Invalid subagent worker JSONL event' ); expect(runner.getActiveRunIds()).toEqual([]); }); it('rejects non-zero exits when the worker does not emit a terminal event', async () => { const workerPath = writeWorker(` process.stderr.write('boom'); process.exit(7); `); const runner = new SubagentProcessRunner({ workerPath }); await expect(runner.run(createEnvelope())).rejects.toThrow( 'Subagent process exited with code 7: boom' ); expect(runner.getActiveRunIds()).toEqual([]); }); it('can cancel an active worker process by runId', async () => { const workerPath = writeWorker(` process.stdin.resume(); setInterval(() => {}, 1000); `); const runner = new SubagentProcessRunner({ workerPath }); const resultPromise = runner.run(createEnvelope({ runId: 'run-cancel' })); await new Promise(resolve => setTimeout(resolve, 100)); expect(runner.getActiveRunIds()).toEqual(['run-cancel']); expect(runner.cancel('run-cancel')).toBe(true); await expect(resultPromise).rejects.toThrow('Subagent process exited with code'); expect(runner.getActiveRunIds()).toEqual([]); }); });