import { eq, and } from "drizzle-orm"; import { db } from "../db/client.js"; import { actors as actorsTable, opencodeSessions } from "../db/schema.js"; import { log } from "../log.js"; import { OpencodeClient } from "../lib/opencode.js"; import { opencodeUrlFor } from "../docker/manager.js"; export type SubAgentRunInput = { userId: string; subAgentId: string; type: string; prompt: string; channelId: string; onEvent: (event: string, data: unknown) => void; }; // Runs a single sub-agent task by opening an OpenCode session and forwarding // the user-provided prompt. Streams events back to the caller (the Grow Agent // actor's broadcast surface) and updates the actors table on completion. // // Sub-agents do NOT spawn their own containers — they multiplex through the // parent Grow Agent's OpenCode container (PRD §3.3). export async function runSubAgentTask(input: SubAgentRunInput): Promise { const { userId, subAgentId, type, prompt, channelId, onEvent } = input; try { const target = await opencodeUrlFor(userId); if (!target) { throw new Error("OpenCode container not provisioned for user"); } const client = new OpencodeClient(target.baseUrl, target.password); const session = await client.createSession({ title: `${type} :: ${subAgentId}`, }); await db.insert(opencodeSessions).values({ id: session.id, userId, actorId: subAgentId, title: session.title ?? null, }); onEvent("sub-agent-event", { subAgentId, type: "started", channelId, sessionId: session.id, }); // Open SSE stream for live progress. const aborter = client.streamEvents((ev) => { onEvent("sub-agent-event", { subAgentId, type: "progress", channelId, event: ev.event, data: ev.data, }); }); // Send the prompt synchronously and capture the final response text. const result = await client.sendMessage({ sessionId: session.id, text: prompt, }); aborter.abort(); await db .update(actorsTable) .set({ status: "done", lastActivityAt: new Date() }) .where( and( eq(actorsTable.userId, userId), eq(actorsTable.actorId, subAgentId), ), ); onEvent("sub-agent-event", { subAgentId, type: "done", channelId, result, }); log.info({ subAgentId, sessionId: session.id }, "sub-agent done"); } catch (err) { log.error({ err, subAgentId }, "sub-agent failed"); await db .update(actorsTable) .set({ status: "error", lastActivityAt: new Date() }) .where( and( eq(actorsTable.userId, userId), eq(actorsTable.actorId, subAgentId), ), ) .catch(() => undefined); onEvent("sub-agent-event", { subAgentId, type: "error", channelId, message: err instanceof Error ? err.message : String(err), }); } }